From d2f9bc48ab57b860914dafa36fb6c8f0b7c9d417 Mon Sep 17 00:00:00 2001 From: Axel Suarez Martinez Date: Wed, 9 Jul 2025 13:17:52 -0500 Subject: [PATCH 1/7] Streaming responses --- .../microsoft/agents/builder/turn_context.py | 19 + .../connector/client/connector_client.py | 4 +- .../microsoft/agents/core/models/__init__.py | 18 + .../microsoft/agents/core/models/activity.py | 4 +- .../microsoft/agents/core/models/ai_entity.py | 142 +++++++ .../agents/hosting/aiohttp/__init__.py | 10 + .../agents/hosting/aiohttp/app/__init__.py | 16 + .../hosting/aiohttp/app/streaming/__init__.py | 13 + .../hosting/aiohttp/app/streaming/citation.py | 22 + .../aiohttp/app/streaming/citation_util.py | 85 ++++ .../app/streaming/streaming_response.py | 382 ++++++++++++++++++ test_samples/app_style/streaming_agent.py | 112 +++++ 12 files changed, 824 insertions(+), 3 deletions(-) create mode 100644 libraries/Core/microsoft-agents-core/microsoft/agents/core/models/ai_entity.py create mode 100644 libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/__init__.py create mode 100644 libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/__init__.py create mode 100644 libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/citation.py create mode 100644 libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/citation_util.py create mode 100644 libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/streaming_response.py create mode 100644 test_samples/app_style/streaming_agent.py diff --git a/libraries/Builder/microsoft-agents-builder/microsoft/agents/builder/turn_context.py b/libraries/Builder/microsoft-agents-builder/microsoft/agents/builder/turn_context.py index d2d49402..ef4a32ac 100644 --- a/libraries/Builder/microsoft-agents-builder/microsoft/agents/builder/turn_context.py +++ b/libraries/Builder/microsoft-agents-builder/microsoft/agents/builder/turn_context.py @@ -124,6 +124,25 @@ def services(self): """ return self._services + @property + def streaming_response(self): + """ + Gets a StreamingResponse instance for this turn context. + This allows for streaming partial responses to the client. + """ + # Use lazy import to avoid circular dependency + if not hasattr(self, "_streaming_response"): + try: + from microsoft.agents.hosting.aiohttp.app.streaming import ( + StreamingResponse, + ) + + self._streaming_response = StreamingResponse(self) + except ImportError: + # If the hosting library isn't available, return None + self._streaming_response = None + return self._streaming_response + def get(self, key: str) -> object: if not key or not isinstance(key, str): raise TypeError('"key" must be a valid string.') diff --git a/libraries/Client/microsoft-agents-connector/microsoft/agents/connector/client/connector_client.py b/libraries/Client/microsoft-agents-connector/microsoft/agents/connector/client/connector_client.py index 5d09d4a1..cd09852e 100644 --- a/libraries/Client/microsoft-agents-connector/microsoft/agents/connector/client/connector_client.py +++ b/libraries/Client/microsoft-agents-connector/microsoft/agents/connector/client/connector_client.py @@ -175,7 +175,9 @@ async def reply_to_activity( async with self.client.post( url, - json=body.model_dump(by_alias=True, exclude_unset=True, mode="json"), + json=body.model_dump( + by_alias=True, exclude_unset=True, exclude_none=True, mode="json" + ), ) as response: if response.status >= 400: logger.error(f"Error replying to activity: {response.status}") diff --git a/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/__init__.py b/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/__init__.py index adf42987..7537dfb9 100644 --- a/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/__init__.py +++ b/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/__init__.py @@ -25,6 +25,16 @@ from .conversations_result import ConversationsResult from .expected_replies import ExpectedReplies from .entity import Entity +from .ai_entity import ( + AIEntity, + ClientCitation, + ClientCitationAppearance, + ClientCitationImage, + ClientCitationIconName, + SensitivityUsageInfo, + SensitivityPattern, + add_ai_to_activity, +) from .error import Error from .error_response import ErrorResponse from .fact import Fact @@ -109,6 +119,14 @@ "ConversationsResult", "ExpectedReplies", "Entity", + "AIEntity", + "ClientCitation", + "ClientCitationAppearance", + "ClientCitationImage", + "ClientCitationIconName", + "SensitivityUsageInfo", + "SensitivityPattern", + "add_ai_to_activity", "Error", "ErrorResponse", "Fact", diff --git a/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/activity.py b/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/activity.py index 37be7d7a..67a359ad 100644 --- a/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/activity.py +++ b/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/activity.py @@ -1,7 +1,7 @@ from copy import copy from datetime import datetime, timezone from typing import Optional -from pydantic import Field +from pydantic import Field, SerializeAsAny from .activity_types import ActivityTypes from .channel_account import ChannelAccount from .conversation_account import ConversationAccount @@ -145,7 +145,7 @@ class Activity(AgentsModel): summary: NonEmptyString = None suggested_actions: SuggestedActions = None attachments: list[Attachment] = None - entities: list[Entity] = None + entities: list[SerializeAsAny[Entity]] = None channel_data: object = None action: NonEmptyString = None reply_to_id: NonEmptyString = None diff --git a/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/ai_entity.py b/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/ai_entity.py new file mode 100644 index 00000000..f7517f70 --- /dev/null +++ b/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/ai_entity.py @@ -0,0 +1,142 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from enum import Enum +from typing import List, Optional, Union, Literal +from dataclasses import dataclass + +from .agents_model import AgentsModel +from .activity import Activity +from .entity import Entity + + +class ClientCitationIconName(str, Enum): + """Enumeration of supported citation icon names.""" + + MICROSOFT_WORD = "Microsoft Word" + MICROSOFT_EXCEL = "Microsoft Excel" + MICROSOFT_POWERPOINT = "Microsoft PowerPoint" + MICROSOFT_ONENOTE = "Microsoft OneNote" + MICROSOFT_SHAREPOINT = "Microsoft SharePoint" + MICROSOFT_VISIO = "Microsoft Visio" + MICROSOFT_LOOP = "Microsoft Loop" + MICROSOFT_WHITEBOARD = "Microsoft Whiteboard" + ADOBE_ILLUSTRATOR = "Adobe Illustrator" + ADOBE_PHOTOSHOP = "Adobe Photoshop" + ADOBE_INDESIGN = "Adobe InDesign" + ADOBE_FLASH = "Adobe Flash" + SKETCH = "Sketch" + SOURCE_CODE = "Source Code" + IMAGE = "Image" + GIF = "GIF" + VIDEO = "Video" + SOUND = "Sound" + ZIP = "ZIP" + TEXT = "Text" + PDF = "PDF" + + +class ClientCitationImage(AgentsModel): + """Information about the citation's icon.""" + + type: str = "ImageObject" + name: str = "" + + +class SensitivityPattern(AgentsModel): + """Pattern information for sensitivity usage info.""" + + type: str = "DefinedTerm" + in_defined_term_set: str = "" + name: str = "" + term_code: str = "" + + +class SensitivityUsageInfo(AgentsModel): + """ + Sensitivity usage info for content sent to the user. + This is used to provide information about the content to the user. + """ + + type: str = "https://schema.org/Message" + schema_type: str = "CreativeWork" + description: Optional[str] = None + name: str = "" + position: Optional[int] = None + pattern: Optional[SensitivityPattern] = None + + +class ClientCitationAppearance(AgentsModel): + """Appearance information for a client citation.""" + + type: str = "DigitalDocument" + name: str = "" + text: Optional[str] = None + url: Optional[str] = None + abstract: str = "" + encoding_format: Optional[str] = None + image: Optional[ClientCitationImage] = None + keywords: Optional[List[str]] = None + usage_info: Optional[SensitivityUsageInfo] = None + + +class ClientCitation(AgentsModel): + """ + Represents a Teams client citation to be included in a message. + See Bot messages with AI-generated content for more details. + https://learn.microsoft.com/en-us/microsoftteams/platform/bots/how-to/bot-messages-ai-generated-content?tabs=before%2Cbotmessage + """ + + type: str = "Claim" + position: int = 0 + appearance: Optional[ClientCitationAppearance] = None + + def __post_init__(self): + if self.appearance is None: + self.appearance = ClientCitationAppearance() + + +class AIEntity(Entity): + """Entity indicating AI-generated content.""" + + type: str = "https://schema.org/Message" + schema_type: str = "Message" + context: str = "https://schema.org" + id: str = "" + additional_type: Optional[List[str]] = None + citation: Optional[List[ClientCitation]] = None + usage_info: Optional[SensitivityUsageInfo] = None + + def __post_init__(self): + if self.additional_type is None: + self.additional_type = ["AIGeneratedContent"] + + +def add_ai_to_activity( + activity: Activity, + citations: Optional[List[ClientCitation]] = None, + usage_info: Optional[SensitivityUsageInfo] = None, +) -> None: + """ + Adds AI entity to an activity to indicate AI-generated content. + + Args: + activity: The activity to modify + citations: Optional list of citations + usage_info: Optional sensitivity usage information + """ + if citations: + ai_entity = AIEntity( + type="https://schema.org/Message", + schema_type="Message", + context="https://schema.org", + id="", + additional_type=["AIGeneratedContent"], + citation=citations, + usage_info=usage_info, + ) + + if activity.entities is None: + activity.entities = [] + + activity.entities.append(ai_entity) diff --git a/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/__init__.py b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/__init__.py index 356f003d..315cc0b1 100644 --- a/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/__init__.py +++ b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/__init__.py @@ -6,6 +6,12 @@ jwt_authorization_middleware, jwt_authorization_decorator, ) +from .app.streaming import ( + Citation, + CitationUtil, + StreamingResponse, + StreamingChannelData, +) __all__ = [ "start_agent_process", @@ -14,4 +20,8 @@ "jwt_authorization_middleware", "jwt_authorization_decorator", "channel_service_route_table", + "Citation", + "CitationUtil", + "StreamingResponse", + "StreamingChannelData", ] diff --git a/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/__init__.py b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/__init__.py new file mode 100644 index 00000000..be107941 --- /dev/null +++ b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/__init__.py @@ -0,0 +1,16 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from .streaming import ( + Citation, + CitationUtil, + StreamingResponse, + StreamingChannelData, +) + +__all__ = [ + "Citation", + "CitationUtil", + "StreamingResponse", + "StreamingChannelData", +] diff --git a/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/__init__.py b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/__init__.py new file mode 100644 index 00000000..f46c8760 --- /dev/null +++ b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from .citation import Citation +from .citation_util import CitationUtil +from .streaming_response import StreamingResponse, StreamingChannelData + +__all__ = [ + "Citation", + "CitationUtil", + "StreamingResponse", + "StreamingChannelData", +] diff --git a/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/citation.py b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/citation.py new file mode 100644 index 00000000..f643639a --- /dev/null +++ b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/citation.py @@ -0,0 +1,22 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from typing import Optional +from dataclasses import dataclass + + +@dataclass +class Citation: + """Citations returned by the model.""" + + content: str + """The content of the citation.""" + + title: Optional[str] = None + """The title of the citation.""" + + url: Optional[str] = None + """The URL of the citation.""" + + filepath: Optional[str] = None + """The filepath of the document.""" diff --git a/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/citation_util.py b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/citation_util.py new file mode 100644 index 00000000..b13ca598 --- /dev/null +++ b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/citation_util.py @@ -0,0 +1,85 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import re +from typing import List, Optional + +from microsoft.agents.core.models import ClientCitation + + +class CitationUtil: + """Utility functions for manipulating text and citations.""" + + @staticmethod + def snippet(text: str, max_length: int) -> str: + """ + Clips the text to a maximum length in case it exceeds the limit. + + Args: + text: The text to clip. + max_length: The maximum length of the text to return, cutting off the last whole word. + + Returns: + The modified text + """ + if len(text) <= max_length: + return text + + snippet = text[:max_length] + snippet = snippet[: min(len(snippet), snippet.rfind(" "))] + snippet += "..." + return snippet + + @staticmethod + def format_citations_response(text: str) -> str: + """ + Convert citation tags `[doc(s)n]` to `[n]` where n is a number. + + Args: + text: The text to format. + + Returns: + The formatted text. + """ + return re.sub(r"\[docs?(\d+)\]", r"[\1]", text, flags=re.IGNORECASE) + + @staticmethod + def get_used_citations( + text: str, citations: List[ClientCitation] + ) -> Optional[List[ClientCitation]]: + """ + Get the citations used in the text. This will remove any citations that are + included in the citations array from the response but not referenced in the text. + + Args: + text: The text to search for citation references, i.e. [1], [2], etc. + citations: The list of citations to search for. + + Returns: + The list of citations used in the text. + """ + regex = re.compile(r"\[(\d+)\]", re.IGNORECASE) + matches = regex.findall(text) + + if not matches: + return None + + # Remove duplicates + filtered_matches = set(matches) + + # Add citations + used_citations = [] + for match in filtered_matches: + citation_ref = f"[{match}]" + found = next( + ( + citation + for citation in citations + if f"[{citation.position}]" == citation_ref + ), + None, + ) + if found: + used_citations.append(found) + + return used_citations if used_citations else None diff --git a/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/streaming_response.py b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/streaming_response.py new file mode 100644 index 00000000..0d888a12 --- /dev/null +++ b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/streaming_response.py @@ -0,0 +1,382 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import asyncio +import logging +from typing import List, Optional, Callable, Literal, TYPE_CHECKING +from dataclasses import dataclass + +from microsoft.agents.core.models import ( + Activity, + Entity, + Attachment, + ClientCitation, + SensitivityUsageInfo, + add_ai_to_activity, +) + +if TYPE_CHECKING: + from microsoft.agents.builder.turn_context import TurnContext + +from .citation import Citation +from .citation_util import CitationUtil + +logger = logging.getLogger(__name__) + + +@dataclass +class StreamingChannelData: + """ + Structure of the outgoing channelData field for streaming responses. + + The expected sequence of streamTypes is: + 'informative', 'streaming', 'streaming', ..., 'final'. + + Once a 'final' message is sent, the stream is considered ended. + """ + + stream_type: Literal["informative", "streaming", "final"] + """The type of message being sent.""" + + stream_sequence: int + """Sequence number of the message in the stream. Starts at 1 for the first message.""" + + stream_id: Optional[str] = None + """ID of the stream. Assigned after the initial update is sent.""" + + +class StreamingResponse: + """ + A helper class for streaming responses to the client. + + This class is used to send a series of updates to the client in a single response. + The expected sequence of calls is: + + `queue_informative_update()`, `queue_text_chunk()`, `queue_text_chunk()`, ..., `end_stream()`. + + Once `end_stream()` is called, the stream is considered ended and no further updates can be sent. + """ + + def __init__(self, context: "TurnContext"): + """ + Creates a new StreamingResponse instance. + + Args: + context: Context for the current turn of conversation with the user. + """ + self._context = context + self._next_sequence = 1 + self._stream_id: Optional[str] = None + self._message = "" + self._attachments: Optional[List[Attachment]] = None + self._ended = False + + # Queue for outgoing activities + self._queue: List[Callable[[], Activity]] = [] + self._queue_sync: Optional[asyncio.Task] = None + self._chunk_queued = False + + # Powered by AI feature flags + self._enable_feedback_loop = False + self._feedback_loop_type: Optional[Literal["default", "custom"]] = None + self._enable_generated_by_ai_label = False + self._citations: Optional[List[ClientCitation]] = [] + self._sensitivity_label: Optional[SensitivityUsageInfo] = None + + @property + def stream_id(self) -> Optional[str]: + """ + Gets the stream ID of the current response. + Assigned after the initial update is sent. + """ + return self._stream_id + + @property + def citations(self) -> Optional[List[ClientCitation]]: + """Gets the citations of the current response.""" + return self._citations + + @property + def updates_sent(self) -> int: + """Gets the number of updates sent for the stream.""" + return self._next_sequence - 1 + + def queue_informative_update(self, text: str) -> None: + """ + Queues an informative update to be sent to the client. + + Args: + text: Text of the update to send. + """ + if self._ended: + raise RuntimeError("The stream has already ended.") + + # Queue a typing activity + def create_activity(): + activity = Activity( + type="typing", + text=text, + channel_data={ + "streamType": "informative", + "streamSequence": self._next_sequence, + }, + ) + self._next_sequence += 1 + return activity + + self._queue_activity(create_activity) + + def queue_text_chunk( + self, text: str, citations: Optional[List[Citation]] = None + ) -> None: + """ + Queues a chunk of partial message text to be sent to the client. + + The text will be sent as quickly as possible to the client. + Chunks may be combined before delivery to the client. + + Args: + text: Partial text of the message to send. + citations: Citations to be included in the message. + """ + if self._ended: + raise RuntimeError("The stream has already ended.") + + # Update full message text + self._message += text + + # If there are citations, modify the content so that the sources are numbers instead of [doc1], [doc2], etc. + self._message = CitationUtil.format_citations_response(self._message) + + # Queue the next chunk + self._queue_next_chunk() + + async def end_stream(self) -> None: + """ + Ends the stream by sending the final message to the client. + """ + if self._ended: + raise RuntimeError("The stream has already ended.") + + # Queue final message + self._ended = True + self._queue_next_chunk() + + # Wait for the queue to drain + await self.wait_for_queue() + + def set_attachments(self, attachments: List[Attachment]) -> None: + """ + Sets the attachments to attach to the final chunk. + + Args: + attachments: List of attachments. + """ + self._attachments = attachments + + def set_sensitivity_label(self, sensitivity_label: SensitivityUsageInfo) -> None: + """ + Sets the sensitivity label to attach to the final chunk. + + Args: + sensitivity_label: The sensitivity label. + """ + self._sensitivity_label = sensitivity_label + + def set_citations(self, citations: List[Citation]) -> None: + """ + Sets the citations for the full message. + + Args: + citations: Citations to be included in the message. + """ + if citations: + if not self._citations: + self._citations = [] + + curr_pos = len(self._citations) + + for citation in citations: + client_citation = ClientCitation( + type="Claim", + position=curr_pos + 1, + appearance={ + "type": "DigitalDocument", + "name": citation.title or f"Document #{curr_pos + 1}", + "abstract": CitationUtil.snippet(citation.content, 477), + }, + ) + curr_pos += 1 + self._citations.append(client_citation) + + def set_feedback_loop(self, enable_feedback_loop: bool) -> None: + """ + Sets the Feedback Loop in Teams that allows a user to + give thumbs up or down to a response. + Default is False. + + Args: + enable_feedback_loop: If true, the feedback loop is enabled. + """ + self._enable_feedback_loop = enable_feedback_loop + + def set_feedback_loop_type( + self, feedback_loop_type: Literal["default", "custom"] + ) -> None: + """ + Sets the type of UI to use for the feedback loop. + + Args: + feedback_loop_type: The type of the feedback loop. + """ + self._feedback_loop_type = feedback_loop_type + + def set_generated_by_ai_label(self, enable_generated_by_ai_label: bool) -> None: + """ + Sets the Generated by AI label in Teams. + Default is False. + + Args: + enable_generated_by_ai_label: If true, the label is added. + """ + self._enable_generated_by_ai_label = enable_generated_by_ai_label + + def get_message(self) -> str: + """ + Returns the most recently streamed message. + """ + return self._message + + async def wait_for_queue(self) -> None: + """ + Waits for the outgoing activity queue to be empty. + """ + if self._queue_sync: + await self._queue_sync + + def _queue_next_chunk(self) -> None: + """ + Queues the next chunk of text to be sent to the client. + """ + # Are we already waiting to send a chunk? + if self._chunk_queued: + return + + # Queue a chunk of text to be sent + self._chunk_queued = True + + def create_activity(): + self._chunk_queued = False + if self._ended: + # Send final message + activity = Activity( + type="message", + text=self._message or "end stream response", + attachments=self._attachments or [], + channel_data={ + "streamType": "final", + "streamSequence": self._next_sequence, + }, + ) + else: + # Send typing activity + activity = Activity( + type="typing", + text=self._message, + channel_data={ + "streamType": "streaming", + "streamSequence": self._next_sequence, + }, + ) + self._next_sequence += 1 + return activity + + self._queue_activity(create_activity) + + def _queue_activity(self, factory: Callable[[], Activity]) -> None: + """ + Queues an activity to be sent to the client. + """ + self._queue.append(factory) + + # If there's no sync in progress, start one + if not self._queue_sync: + self._queue_sync = asyncio.create_task(self._drain_queue()) + + async def _drain_queue(self) -> None: + """ + Sends any queued activities to the client until the queue is empty. + """ + try: + logger.debug(f"Draining queue with {len(self._queue)} activities.") + while self._queue: + factory = self._queue.pop(0) + activity = factory() + await self._send_activity(activity) + except Exception as err: + logger.error(f"Error occurred when sending activity while streaming: {err}") + raise + finally: + self._queue_sync = None + + async def _send_activity(self, activity: Activity) -> None: + """ + Sends an activity to the client and saves the stream ID returned. + + Args: + activity: The activity to send. + """ + # Set activity ID to the assigned stream ID + if self._stream_id: + activity.id = self._stream_id + if not activity.channel_data: + activity.channel_data = {} + activity.channel_data["streamId"] = self._stream_id + + if not activity.entities: + activity.entities = [] + + activity.entities.append(Entity(type="streaminfo", **activity.channel_data)) + + if self._citations and len(self._citations) > 0 and not self._ended: + # Filter out the citations unused in content. + curr_citations = CitationUtil.get_used_citations( + self._message, self._citations + ) + if curr_citations: + activity.entities.append( + Entity( + type="https://schema.org/Message", + schema_type="Message", + context="https://schema.org", + id="", + citation=curr_citations, + ) + ) + + # Add in Powered by AI feature flags + if self._ended: + if self._enable_feedback_loop and self._feedback_loop_type: + if not activity.channel_data: + activity.channel_data = {} + activity.channel_data["feedbackLoop"] = { + "type": self._feedback_loop_type + } + else: + if not activity.channel_data: + activity.channel_data = {} + activity.channel_data["feedbackLoopEnabled"] = ( + self._enable_feedback_loop + ) + + # Add in Generated by AI + if self._enable_generated_by_ai_label: + add_ai_to_activity(activity, self._citations, self._sensitivity_label) + + # Send activity + response = await self._context.send_activity(activity) + await asyncio.sleep(1.5) # Equivalent to setTimeout in the TypeScript code + + # Save assigned stream ID + if not self._stream_id and response: + self._stream_id = response.id diff --git a/test_samples/app_style/streaming_agent.py b/test_samples/app_style/streaming_agent.py new file mode 100644 index 00000000..36b0ebd8 --- /dev/null +++ b/test_samples/app_style/streaming_agent.py @@ -0,0 +1,112 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import asyncio +from os import environ +import re +import sys +import traceback + +from dotenv import load_dotenv +from microsoft.agents.builder.app import AgentApplication, TurnState +from microsoft.agents.builder.app.oauth import Authorization +from microsoft.agents.hosting.aiohttp import CloudAdapter, Citation +from microsoft.agents.authentication.msal import MsalConnectionManager + +from microsoft.agents.builder import ( + TurnContext, + MessageFactory, +) +from microsoft.agents.storage import MemoryStorage +from microsoft.agents.core import load_configuration_from_env +from microsoft.agents.core.models import Activity, ActivityTypes, SensitivityUsageInfo + +from shared import start_server + +load_dotenv() + +agents_sdk_config = load_configuration_from_env(environ) + +STORAGE = MemoryStorage() +CONNECTION_MANAGER = MsalConnectionManager(**agents_sdk_config) +ADAPTER = CloudAdapter(connection_manager=CONNECTION_MANAGER) +AUTHORIZATION = Authorization(STORAGE, CONNECTION_MANAGER, **agents_sdk_config) + +AGENT_APP = AgentApplication[TurnState]( + storage=STORAGE, adapter=ADAPTER, authorization=AUTHORIZATION, **agents_sdk_config +) + + +@AGENT_APP.activity(ActivityTypes.invoke) +async def invoke(context: TurnContext, state: TurnState) -> str: + """ + Internal method to process template expansion or function invocation. + """ + invoke_response = Activity( + type=ActivityTypes.invoke_response, value={"status": 200} + ) + print(f"Invoke activity received: {context.activity}") + await context.send_activity(invoke_response) + + +@AGENT_APP.conversation_update("membersAdded") +async def on_members_added(context: TurnContext, _state: TurnState): + await context.send_activity( + "Welcome to the Streaming sample, send a message to see the echo feature in action." + ) + return True + + +@AGENT_APP.activity("message") +async def on_message(context: TurnContext, state: TurnState): + context.streaming_response.set_feedback_loop(True) + context.streaming_response.set_sensitivity_label( + SensitivityUsageInfo( + type="https://schema.org/Message", + schema_type="CreativeWork", + name="Internal", + ) + ) + context.streaming_response.set_generated_by_ai_label(True) + context.streaming_response.queue_informative_update("starting streaming response") + await asyncio.sleep(1) + + for i in range(5): + print(f"Streaming chunk {i + 1}") + context.streaming_response.queue_text_chunk(f"part [{i + 1}] ") + await asyncio.sleep(i * 0.5) + + context.streaming_response.queue_text_chunk( + "This is the final message part. [doc1] and [doc2]" + ) + context.streaming_response.set_citations( + [ + Citation(title="Citation1", content="file", filepath="", url="file:////"), + Citation( + title="Citation2", + content="loooonger content", + filepath="", + url="file:////", + ), + ] + ) + + await context.streaming_response.end_stream() + + +@AGENT_APP.error +async def on_error(context: TurnContext, error: Exception): + # This check writes out errors to console log .vs. app insights. + # NOTE: In production environment, you should consider logging this to Azure + # application insights. + print(f"\n [on_turn_error] unhandled error: {error}", file=sys.stderr) + traceback.print_exc() + + # Send a message to the user + await context.send_activity("The bot encountered an error or bug.") + + +start_server( + agent_application=AGENT_APP, + auth_configuration=CONNECTION_MANAGER.get_default_connection_configuration(), +) From ba25de95a37229e8963aa0d31ce862a9af2ffa18 Mon Sep 17 00:00:00 2001 From: Axel Suarez Martinez Date: Fri, 11 Jul 2025 19:18:39 -0500 Subject: [PATCH 2/7] Supporting webchat and non-streaming channels, few items for backlog --- .../agents/builder/message_factory.py | 16 +- .../microsoft/agents/builder/turn_context.py | 5 +- .../connector/client/connector_client.py | 5 +- .../microsoft/agents/core/models/activity.py | 2 +- .../agents/core/models/delivery_modes.py | 1 + .../app/streaming/streaming_response.py | 204 +++++++++++++++++- test_samples/app_style/streaming_agent.py | 2 +- 7 files changed, 213 insertions(+), 22 deletions(-) diff --git a/libraries/Builder/microsoft-agents-builder/microsoft/agents/builder/message_factory.py b/libraries/Builder/microsoft-agents-builder/microsoft/agents/builder/message_factory.py index bfafa491..07d7f6e1 100644 --- a/libraries/Builder/microsoft-agents-builder/microsoft/agents/builder/message_factory.py +++ b/libraries/Builder/microsoft-agents-builder/microsoft/agents/builder/message_factory.py @@ -19,7 +19,7 @@ def attachment_activity( attachments: list[Attachment], text: str = None, speak: str = None, - input_hint: InputHints | str = InputHints.accepting_input, + input_hint: InputHints | str = None, ) -> Activity: message = Activity( type=ActivityTypes.message, @@ -45,7 +45,7 @@ class MessageFactory: def text( text: str, speak: str = None, - input_hint: InputHints | str = InputHints.accepting_input, + input_hint: InputHints | str = None, ) -> Activity: """ Returns a simple text message. @@ -59,9 +59,11 @@ def text( :param input_hint: :return: """ - message = Activity(type=ActivityTypes.message, text=text, input_hint=input_hint) + message = Activity(type=ActivityTypes.message, text=text) if speak: message.speak = speak + if input_hint: + message.input_hint = input_hint return message @@ -70,7 +72,7 @@ def suggested_actions( actions: list[CardAction], text: str = None, speak: str = None, - input_hint: InputHints | str = InputHints.accepting_input, + input_hint: InputHints | str = None, ) -> Activity: """ Returns a message that includes a set of suggested actions and optional text. @@ -89,13 +91,13 @@ def suggested_actions( :return: """ actions = SuggestedActions(actions=actions) - message = Activity( - type=ActivityTypes.message, input_hint=input_hint, suggested_actions=actions - ) + message = Activity(type=ActivityTypes.message, suggested_actions=actions) if text: message.text = text if speak: message.speak = speak + if input_hint: + message.input_hint = input_hint return message @staticmethod diff --git a/libraries/Builder/microsoft-agents-builder/microsoft/agents/builder/turn_context.py b/libraries/Builder/microsoft-agents-builder/microsoft/agents/builder/turn_context.py index ef4a32ac..0e950a42 100644 --- a/libraries/Builder/microsoft-agents-builder/microsoft/agents/builder/turn_context.py +++ b/libraries/Builder/microsoft-agents-builder/microsoft/agents/builder/turn_context.py @@ -188,10 +188,11 @@ async def send_activity( activity_or_text = Activity( type=ActivityTypes.message, text=activity_or_text, - input_hint=input_hint or InputHints.accepting_input, ) if speak: activity_or_text.speak = speak + if input_hint: + activity_or_text.input_hint = input_hint result = await self.send_activities([activity_or_text]) return result[0] if result else None @@ -209,8 +210,6 @@ def activity_validator(activity: Activity) -> Activity: if activity.type != ActivityTypes.trace: nonlocal sent_non_trace_activity sent_non_trace_activity = True - if not activity.input_hint: - activity.input_hint = "acceptingInput" activity.id = None return activity diff --git a/libraries/Client/microsoft-agents-connector/microsoft/agents/connector/client/connector_client.py b/libraries/Client/microsoft-agents-connector/microsoft/agents/connector/client/connector_client.py index cd09852e..cbb0acc3 100644 --- a/libraries/Client/microsoft-agents-connector/microsoft/agents/connector/client/connector_client.py +++ b/libraries/Client/microsoft-agents-connector/microsoft/agents/connector/client/connector_client.py @@ -179,11 +179,12 @@ async def reply_to_activity( by_alias=True, exclude_unset=True, exclude_none=True, mode="json" ), ) as response: + data = await response.json() if response.content_length else {} + if response.status >= 400: - logger.error(f"Error replying to activity: {response.status}") + logger.error(f"Error replying to activity: {data or response.status}") response.raise_for_status() - data = await response.json() if response.content_length else {} logger.info( f"Reply to conversation/activity: {data.get('id')}, {activity_id}" ) diff --git a/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/activity.py b/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/activity.py index 67a359ad..44593f2c 100644 --- a/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/activity.py +++ b/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/activity.py @@ -144,7 +144,7 @@ class Activity(AgentsModel): input_hint: NonEmptyString = None summary: NonEmptyString = None suggested_actions: SuggestedActions = None - attachments: list[Attachment] = None + attachments: Optional[list[Attachment]] = None entities: list[SerializeAsAny[Entity]] = None channel_data: object = None action: NonEmptyString = None diff --git a/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/delivery_modes.py b/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/delivery_modes.py index 9be42371..856601b5 100644 --- a/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/delivery_modes.py +++ b/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/delivery_modes.py @@ -6,3 +6,4 @@ class DeliveryModes(str, Enum): notification = "notification" expect_replies = "expectReplies" ephemeral = "ephemeral" + stream = "stream" diff --git a/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/streaming_response.py b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/streaming_response.py index 0d888a12..00e13a18 100644 --- a/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/streaming_response.py +++ b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/streaming_response.py @@ -3,6 +3,7 @@ import asyncio import logging +import uuid from typing import List, Optional, Callable, Literal, TYPE_CHECKING from dataclasses import dataclass @@ -13,6 +14,8 @@ ClientCitation, SensitivityUsageInfo, add_ai_to_activity, + Channels, + DeliveryModes, ) if TYPE_CHECKING: @@ -52,9 +55,7 @@ class StreamingResponse: This class is used to send a series of updates to the client in a single response. The expected sequence of calls is: - `queue_informative_update()`, `queue_text_chunk()`, `queue_text_chunk()`, ..., `end_stream()`. - - Once `end_stream()` is called, the stream is considered ended and no further updates can be sent. + `queue_informative_update()`, `queue_text_chunk()`, `queue_text_chunk()`, ..., `end_stream()`. Once `end_stream()` is called, the stream is considered ended and no further updates can be sent. """ def __init__(self, context: "TurnContext"): @@ -70,11 +71,16 @@ def __init__(self, context: "TurnContext"): self._message = "" self._attachments: Optional[List[Attachment]] = None self._ended = False + self._cancelled = False + self._informative_sent = False + self._message_updated = False + self._final_message = None # Queue for outgoing activities self._queue: List[Callable[[], Activity]] = [] self._queue_sync: Optional[asyncio.Task] = None self._chunk_queued = False + self._timer_task: Optional[asyncio.Task] = None # Powered by AI feature flags self._enable_feedback_loop = False @@ -83,6 +89,14 @@ def __init__(self, context: "TurnContext"): self._citations: Optional[List[ClientCitation]] = [] self._sensitivity_label: Optional[SensitivityUsageInfo] = None + # Channel-specific settings + self._is_teams_channel = False + self._interval = 100 # Default interval in milliseconds + self._is_streaming_channel = False + + # Initialize channel-specific settings + self._set_defaults(context) + @property def stream_id(self) -> Optional[str]: """ @@ -101,6 +115,23 @@ def updates_sent(self) -> int: """Gets the number of updates sent for the stream.""" return self._next_sequence - 1 + @property + def final_message(self) -> Optional[Activity]: + """ + Gets the final message that will be sent to the client. + This is only set after `end_stream()` is called. + """ + return self._final_message + + @final_message.setter + def set_final_message(self, value: Activity) -> None: + """Sets the final message to be sent to the client. + + Args: + value: The final message to send. + """ + self._final_message = value + def queue_informative_update(self, text: str) -> None: """ Queues an informative update to be sent to the client. @@ -108,9 +139,14 @@ def queue_informative_update(self, text: str) -> None: Args: text: Text of the update to send. """ + if not self._is_streaming_channel: + return + if self._ended: raise RuntimeError("The stream has already ended.") + self._informative_sent = True + # Queue a typing activity def create_activity(): activity = Activity( @@ -139,15 +175,29 @@ def queue_text_chunk( text: Partial text of the message to send. citations: Citations to be included in the message. """ + if not text or self._cancelled: + return + if self._ended: raise RuntimeError("The stream has already ended.") + if not self._informative_sent and self._is_teams_channel: + raise RuntimeError( + "Teams requires calling queue_informative_update() before queue_text_chunk()" + ) + # Update full message text self._message += text # If there are citations, modify the content so that the sources are numbers instead of [doc1], [doc2], etc. self._message = CitationUtil.format_citations_response(self._message) + self._message_updated = True + + # Start stream if we're on a streaming channel + if self._is_streaming_channel: + self._start_stream() + # Queue the next chunk self._queue_next_chunk() @@ -155,16 +205,36 @@ async def end_stream(self) -> None: """ Ends the stream by sending the final message to the client. """ + if not self._is_streaming_channel: + if self._ended: + raise RuntimeError("The stream has already ended.") + + self._ended = True + + # Timer isn't running for non-streaming channels. Just send the Message buffer as a message. + if self.updates_sent > 0 or self._message or self._final_message: + await self._send_final_message() + return + if self._ended: - raise RuntimeError("The stream has already ended.") + return - # Queue final message self._ended = True - self._queue_next_chunk() - # Wait for the queue to drain + if self.updates_sent == 0 or self._cancelled: + # Nothing was queued. Nothing to "end". + return + + # Stop the streaming timer + self._stop_stream() + + # Wait for the queue to drain and send final message await self.wait_for_queue() + # TODO: NEED to revisit final message logic + # if self.updates_sent > 0 or self._final_message: + # await self._send_final_message() + def set_attachments(self, attachments: List[Attachment]) -> None: """ Sets the attachments to attach to the final chunk. @@ -356,6 +426,7 @@ async def _send_activity(self, activity: Activity) -> None: # Add in Powered by AI feature flags if self._ended: + # TODO: fix feedback loop if self._enable_feedback_loop and self._feedback_loop_type: if not activity.channel_data: activity.channel_data = {} @@ -373,10 +444,127 @@ async def _send_activity(self, activity: Activity) -> None: if self._enable_generated_by_ai_label: add_ai_to_activity(activity, self._citations, self._sensitivity_label) + if self._is_teams_channel: + activity.channel_data = None + # Send activity response = await self._context.send_activity(activity) - await asyncio.sleep(1.5) # Equivalent to setTimeout in the TypeScript code # Save assigned stream ID if not self._stream_id and response: self._stream_id = response.id + + @property + def is_streaming_channel(self) -> bool: + """ + Indicate if the current channel supports intermediate messages. + + Channels that don't support intermediate messages will buffer + text, and send a normal final message when end_stream is called. + """ + return self._is_streaming_channel + + @property + def interval(self) -> int: + """ + The interval in milliseconds at which intermediate messages are sent. + + Teams default: 1000 + WebChat default: 500 + Other channels: 100 + """ + return self._interval + + @interval.setter + def interval(self, value: int) -> None: + """Set the interval for sending intermediate messages.""" + self._interval = value + + def is_stream_started(self) -> bool: + """Check if the streaming timer has been started.""" + return self._timer_task is not None and not self._timer_task.done() + + def _set_defaults(self, context: "TurnContext") -> None: + """Set channel-specific defaults based on the turn context.""" + channel_id = getattr(context.activity, "channel_id", None) + delivery_mode = getattr(context.activity, "delivery_mode", None) + + self._is_teams_channel = channel_id == Channels.ms_teams + + if self._is_teams_channel: + # Teams MUST use the Activity.Id returned from the first Informative message for + # subsequent intermediate messages. Do not set StreamId here. + self._interval = 1000 + self._is_streaming_channel = True + elif channel_id == Channels.webchat: + self._interval = 500 + self._is_streaming_channel = True + # WebChat will use whatever StreamId is created + self._stream_id = str(uuid.uuid4()) + else: + # Support streaming for DeliveryMode.Stream + self._is_streaming_channel = delivery_mode == DeliveryModes.stream + self._interval = 100 + + def _start_stream(self) -> None: + """Start the streaming timer if not already started.""" + if self._timer_task is None and self._is_streaming_channel: + self._timer_task = asyncio.create_task(self._send_intermediate_messages()) + + def _stop_stream(self) -> None: + """Stop the streaming timer.""" + if self._timer_task and not self._timer_task.done(): + self._timer_task.cancel() + self._timer_task = None + + async def _send_intermediate_messages(self) -> None: + """Timer task to send intermediate messages at intervals.""" + try: + while not self._ended and not self._cancelled: + await asyncio.sleep(self._interval / 1000.0) # Convert ms to seconds + + if self._message_updated: + self._queue_next_chunk() + self._message_updated = False + + # Process any queued activities + await self._drain_queue() + + except asyncio.CancelledError: + pass + + async def _send_final_message(self) -> None: + """Send the final message with all accumulated content.""" + activity = self.final_message or Activity( + type="message", + text=self._message or "No text was streamed", + attachments=self._attachments, + entities=[], + ) + + if self._is_streaming_channel: + channel_data = { + "streamType": "final", + # "streamSequence": self._next_sequence, + "streamResult": "success" if self._message else "error", + "streamId": self._stream_id, + } + + activity.entities.append(Entity(type="streaminfo", **channel_data)) + + if not self._is_teams_channel: + activity.channel_data = channel_data + + self._next_sequence += 1 + + # Add AI entity if enabled + if self._enable_generated_by_ai_label: + used_citations = None + if self._citations: + used_citations = CitationUtil.get_used_citations( + self._message, self._citations + ) + add_ai_to_activity(activity, used_citations, self._sensitivity_label) + + # Send the final activity + await self._context.send_activity(activity) diff --git a/test_samples/app_style/streaming_agent.py b/test_samples/app_style/streaming_agent.py index 36b0ebd8..35d56df3 100644 --- a/test_samples/app_style/streaming_agent.py +++ b/test_samples/app_style/streaming_agent.py @@ -52,7 +52,7 @@ async def invoke(context: TurnContext, state: TurnState) -> str: @AGENT_APP.conversation_update("membersAdded") async def on_members_added(context: TurnContext, _state: TurnState): await context.send_activity( - "Welcome to the Streaming sample, send a message to see the echo feature in action." + "Welcome to the Streaming sample, send a message to see the streaming feature in action." ) return True From 8f9a89977151e391f7e2d0f71fc3e788be804aca Mon Sep 17 00:00:00 2001 From: Axel Suarez Date: Mon, 14 Jul 2025 13:20:00 -0700 Subject: [PATCH 3/7] Revert "Supporting webchat and non-streaming channels, few items for backlog" This reverts commit ba25de95a37229e8963aa0d31ce862a9af2ffa18. --- .../agents/builder/message_factory.py | 16 +- .../microsoft/agents/builder/turn_context.py | 5 +- .../connector/client/connector_client.py | 5 +- .../microsoft/agents/core/models/activity.py | 2 +- .../agents/core/models/delivery_modes.py | 1 - .../app/streaming/streaming_response.py | 204 +----------------- test_samples/app_style/streaming_agent.py | 2 +- 7 files changed, 22 insertions(+), 213 deletions(-) diff --git a/libraries/Builder/microsoft-agents-builder/microsoft/agents/builder/message_factory.py b/libraries/Builder/microsoft-agents-builder/microsoft/agents/builder/message_factory.py index 07d7f6e1..bfafa491 100644 --- a/libraries/Builder/microsoft-agents-builder/microsoft/agents/builder/message_factory.py +++ b/libraries/Builder/microsoft-agents-builder/microsoft/agents/builder/message_factory.py @@ -19,7 +19,7 @@ def attachment_activity( attachments: list[Attachment], text: str = None, speak: str = None, - input_hint: InputHints | str = None, + input_hint: InputHints | str = InputHints.accepting_input, ) -> Activity: message = Activity( type=ActivityTypes.message, @@ -45,7 +45,7 @@ class MessageFactory: def text( text: str, speak: str = None, - input_hint: InputHints | str = None, + input_hint: InputHints | str = InputHints.accepting_input, ) -> Activity: """ Returns a simple text message. @@ -59,11 +59,9 @@ def text( :param input_hint: :return: """ - message = Activity(type=ActivityTypes.message, text=text) + message = Activity(type=ActivityTypes.message, text=text, input_hint=input_hint) if speak: message.speak = speak - if input_hint: - message.input_hint = input_hint return message @@ -72,7 +70,7 @@ def suggested_actions( actions: list[CardAction], text: str = None, speak: str = None, - input_hint: InputHints | str = None, + input_hint: InputHints | str = InputHints.accepting_input, ) -> Activity: """ Returns a message that includes a set of suggested actions and optional text. @@ -91,13 +89,13 @@ def suggested_actions( :return: """ actions = SuggestedActions(actions=actions) - message = Activity(type=ActivityTypes.message, suggested_actions=actions) + message = Activity( + type=ActivityTypes.message, input_hint=input_hint, suggested_actions=actions + ) if text: message.text = text if speak: message.speak = speak - if input_hint: - message.input_hint = input_hint return message @staticmethod diff --git a/libraries/Builder/microsoft-agents-builder/microsoft/agents/builder/turn_context.py b/libraries/Builder/microsoft-agents-builder/microsoft/agents/builder/turn_context.py index 0e950a42..ef4a32ac 100644 --- a/libraries/Builder/microsoft-agents-builder/microsoft/agents/builder/turn_context.py +++ b/libraries/Builder/microsoft-agents-builder/microsoft/agents/builder/turn_context.py @@ -188,11 +188,10 @@ async def send_activity( activity_or_text = Activity( type=ActivityTypes.message, text=activity_or_text, + input_hint=input_hint or InputHints.accepting_input, ) if speak: activity_or_text.speak = speak - if input_hint: - activity_or_text.input_hint = input_hint result = await self.send_activities([activity_or_text]) return result[0] if result else None @@ -210,6 +209,8 @@ def activity_validator(activity: Activity) -> Activity: if activity.type != ActivityTypes.trace: nonlocal sent_non_trace_activity sent_non_trace_activity = True + if not activity.input_hint: + activity.input_hint = "acceptingInput" activity.id = None return activity diff --git a/libraries/Client/microsoft-agents-connector/microsoft/agents/connector/client/connector_client.py b/libraries/Client/microsoft-agents-connector/microsoft/agents/connector/client/connector_client.py index cbb0acc3..cd09852e 100644 --- a/libraries/Client/microsoft-agents-connector/microsoft/agents/connector/client/connector_client.py +++ b/libraries/Client/microsoft-agents-connector/microsoft/agents/connector/client/connector_client.py @@ -179,12 +179,11 @@ async def reply_to_activity( by_alias=True, exclude_unset=True, exclude_none=True, mode="json" ), ) as response: - data = await response.json() if response.content_length else {} - if response.status >= 400: - logger.error(f"Error replying to activity: {data or response.status}") + logger.error(f"Error replying to activity: {response.status}") response.raise_for_status() + data = await response.json() if response.content_length else {} logger.info( f"Reply to conversation/activity: {data.get('id')}, {activity_id}" ) diff --git a/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/activity.py b/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/activity.py index 44593f2c..67a359ad 100644 --- a/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/activity.py +++ b/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/activity.py @@ -144,7 +144,7 @@ class Activity(AgentsModel): input_hint: NonEmptyString = None summary: NonEmptyString = None suggested_actions: SuggestedActions = None - attachments: Optional[list[Attachment]] = None + attachments: list[Attachment] = None entities: list[SerializeAsAny[Entity]] = None channel_data: object = None action: NonEmptyString = None diff --git a/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/delivery_modes.py b/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/delivery_modes.py index 856601b5..9be42371 100644 --- a/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/delivery_modes.py +++ b/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/delivery_modes.py @@ -6,4 +6,3 @@ class DeliveryModes(str, Enum): notification = "notification" expect_replies = "expectReplies" ephemeral = "ephemeral" - stream = "stream" diff --git a/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/streaming_response.py b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/streaming_response.py index 00e13a18..0d888a12 100644 --- a/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/streaming_response.py +++ b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/streaming_response.py @@ -3,7 +3,6 @@ import asyncio import logging -import uuid from typing import List, Optional, Callable, Literal, TYPE_CHECKING from dataclasses import dataclass @@ -14,8 +13,6 @@ ClientCitation, SensitivityUsageInfo, add_ai_to_activity, - Channels, - DeliveryModes, ) if TYPE_CHECKING: @@ -55,7 +52,9 @@ class StreamingResponse: This class is used to send a series of updates to the client in a single response. The expected sequence of calls is: - `queue_informative_update()`, `queue_text_chunk()`, `queue_text_chunk()`, ..., `end_stream()`. Once `end_stream()` is called, the stream is considered ended and no further updates can be sent. + `queue_informative_update()`, `queue_text_chunk()`, `queue_text_chunk()`, ..., `end_stream()`. + + Once `end_stream()` is called, the stream is considered ended and no further updates can be sent. """ def __init__(self, context: "TurnContext"): @@ -71,16 +70,11 @@ def __init__(self, context: "TurnContext"): self._message = "" self._attachments: Optional[List[Attachment]] = None self._ended = False - self._cancelled = False - self._informative_sent = False - self._message_updated = False - self._final_message = None # Queue for outgoing activities self._queue: List[Callable[[], Activity]] = [] self._queue_sync: Optional[asyncio.Task] = None self._chunk_queued = False - self._timer_task: Optional[asyncio.Task] = None # Powered by AI feature flags self._enable_feedback_loop = False @@ -89,14 +83,6 @@ def __init__(self, context: "TurnContext"): self._citations: Optional[List[ClientCitation]] = [] self._sensitivity_label: Optional[SensitivityUsageInfo] = None - # Channel-specific settings - self._is_teams_channel = False - self._interval = 100 # Default interval in milliseconds - self._is_streaming_channel = False - - # Initialize channel-specific settings - self._set_defaults(context) - @property def stream_id(self) -> Optional[str]: """ @@ -115,23 +101,6 @@ def updates_sent(self) -> int: """Gets the number of updates sent for the stream.""" return self._next_sequence - 1 - @property - def final_message(self) -> Optional[Activity]: - """ - Gets the final message that will be sent to the client. - This is only set after `end_stream()` is called. - """ - return self._final_message - - @final_message.setter - def set_final_message(self, value: Activity) -> None: - """Sets the final message to be sent to the client. - - Args: - value: The final message to send. - """ - self._final_message = value - def queue_informative_update(self, text: str) -> None: """ Queues an informative update to be sent to the client. @@ -139,14 +108,9 @@ def queue_informative_update(self, text: str) -> None: Args: text: Text of the update to send. """ - if not self._is_streaming_channel: - return - if self._ended: raise RuntimeError("The stream has already ended.") - self._informative_sent = True - # Queue a typing activity def create_activity(): activity = Activity( @@ -175,29 +139,15 @@ def queue_text_chunk( text: Partial text of the message to send. citations: Citations to be included in the message. """ - if not text or self._cancelled: - return - if self._ended: raise RuntimeError("The stream has already ended.") - if not self._informative_sent and self._is_teams_channel: - raise RuntimeError( - "Teams requires calling queue_informative_update() before queue_text_chunk()" - ) - # Update full message text self._message += text # If there are citations, modify the content so that the sources are numbers instead of [doc1], [doc2], etc. self._message = CitationUtil.format_citations_response(self._message) - self._message_updated = True - - # Start stream if we're on a streaming channel - if self._is_streaming_channel: - self._start_stream() - # Queue the next chunk self._queue_next_chunk() @@ -205,36 +155,16 @@ async def end_stream(self) -> None: """ Ends the stream by sending the final message to the client. """ - if not self._is_streaming_channel: - if self._ended: - raise RuntimeError("The stream has already ended.") - - self._ended = True - - # Timer isn't running for non-streaming channels. Just send the Message buffer as a message. - if self.updates_sent > 0 or self._message or self._final_message: - await self._send_final_message() - return - if self._ended: - return + raise RuntimeError("The stream has already ended.") + # Queue final message self._ended = True + self._queue_next_chunk() - if self.updates_sent == 0 or self._cancelled: - # Nothing was queued. Nothing to "end". - return - - # Stop the streaming timer - self._stop_stream() - - # Wait for the queue to drain and send final message + # Wait for the queue to drain await self.wait_for_queue() - # TODO: NEED to revisit final message logic - # if self.updates_sent > 0 or self._final_message: - # await self._send_final_message() - def set_attachments(self, attachments: List[Attachment]) -> None: """ Sets the attachments to attach to the final chunk. @@ -426,7 +356,6 @@ async def _send_activity(self, activity: Activity) -> None: # Add in Powered by AI feature flags if self._ended: - # TODO: fix feedback loop if self._enable_feedback_loop and self._feedback_loop_type: if not activity.channel_data: activity.channel_data = {} @@ -444,127 +373,10 @@ async def _send_activity(self, activity: Activity) -> None: if self._enable_generated_by_ai_label: add_ai_to_activity(activity, self._citations, self._sensitivity_label) - if self._is_teams_channel: - activity.channel_data = None - # Send activity response = await self._context.send_activity(activity) + await asyncio.sleep(1.5) # Equivalent to setTimeout in the TypeScript code # Save assigned stream ID if not self._stream_id and response: self._stream_id = response.id - - @property - def is_streaming_channel(self) -> bool: - """ - Indicate if the current channel supports intermediate messages. - - Channels that don't support intermediate messages will buffer - text, and send a normal final message when end_stream is called. - """ - return self._is_streaming_channel - - @property - def interval(self) -> int: - """ - The interval in milliseconds at which intermediate messages are sent. - - Teams default: 1000 - WebChat default: 500 - Other channels: 100 - """ - return self._interval - - @interval.setter - def interval(self, value: int) -> None: - """Set the interval for sending intermediate messages.""" - self._interval = value - - def is_stream_started(self) -> bool: - """Check if the streaming timer has been started.""" - return self._timer_task is not None and not self._timer_task.done() - - def _set_defaults(self, context: "TurnContext") -> None: - """Set channel-specific defaults based on the turn context.""" - channel_id = getattr(context.activity, "channel_id", None) - delivery_mode = getattr(context.activity, "delivery_mode", None) - - self._is_teams_channel = channel_id == Channels.ms_teams - - if self._is_teams_channel: - # Teams MUST use the Activity.Id returned from the first Informative message for - # subsequent intermediate messages. Do not set StreamId here. - self._interval = 1000 - self._is_streaming_channel = True - elif channel_id == Channels.webchat: - self._interval = 500 - self._is_streaming_channel = True - # WebChat will use whatever StreamId is created - self._stream_id = str(uuid.uuid4()) - else: - # Support streaming for DeliveryMode.Stream - self._is_streaming_channel = delivery_mode == DeliveryModes.stream - self._interval = 100 - - def _start_stream(self) -> None: - """Start the streaming timer if not already started.""" - if self._timer_task is None and self._is_streaming_channel: - self._timer_task = asyncio.create_task(self._send_intermediate_messages()) - - def _stop_stream(self) -> None: - """Stop the streaming timer.""" - if self._timer_task and not self._timer_task.done(): - self._timer_task.cancel() - self._timer_task = None - - async def _send_intermediate_messages(self) -> None: - """Timer task to send intermediate messages at intervals.""" - try: - while not self._ended and not self._cancelled: - await asyncio.sleep(self._interval / 1000.0) # Convert ms to seconds - - if self._message_updated: - self._queue_next_chunk() - self._message_updated = False - - # Process any queued activities - await self._drain_queue() - - except asyncio.CancelledError: - pass - - async def _send_final_message(self) -> None: - """Send the final message with all accumulated content.""" - activity = self.final_message or Activity( - type="message", - text=self._message or "No text was streamed", - attachments=self._attachments, - entities=[], - ) - - if self._is_streaming_channel: - channel_data = { - "streamType": "final", - # "streamSequence": self._next_sequence, - "streamResult": "success" if self._message else "error", - "streamId": self._stream_id, - } - - activity.entities.append(Entity(type="streaminfo", **channel_data)) - - if not self._is_teams_channel: - activity.channel_data = channel_data - - self._next_sequence += 1 - - # Add AI entity if enabled - if self._enable_generated_by_ai_label: - used_citations = None - if self._citations: - used_citations = CitationUtil.get_used_citations( - self._message, self._citations - ) - add_ai_to_activity(activity, used_citations, self._sensitivity_label) - - # Send the final activity - await self._context.send_activity(activity) diff --git a/test_samples/app_style/streaming_agent.py b/test_samples/app_style/streaming_agent.py index 35d56df3..36b0ebd8 100644 --- a/test_samples/app_style/streaming_agent.py +++ b/test_samples/app_style/streaming_agent.py @@ -52,7 +52,7 @@ async def invoke(context: TurnContext, state: TurnState) -> str: @AGENT_APP.conversation_update("membersAdded") async def on_members_added(context: TurnContext, _state: TurnState): await context.send_activity( - "Welcome to the Streaming sample, send a message to see the streaming feature in action." + "Welcome to the Streaming sample, send a message to see the echo feature in action." ) return True From 94a0996301cefc58107e28f8c4b1c04f1db7bab3 Mon Sep 17 00:00:00 2001 From: Axel Suarez Date: Mon, 14 Jul 2025 16:54:44 -0700 Subject: [PATCH 4/7] Reverting to just Entities, scenario broken --- .../agents/hosting/aiohttp/__init__.py | 2 - .../agents/hosting/aiohttp/app/__init__.py | 2 - .../hosting/aiohttp/app/streaming/__init__.py | 3 +- .../app/streaming/streaming_response.py | 95 +++++++++---------- 4 files changed, 44 insertions(+), 58 deletions(-) diff --git a/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/__init__.py b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/__init__.py index 315cc0b1..ed572337 100644 --- a/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/__init__.py +++ b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/__init__.py @@ -10,7 +10,6 @@ Citation, CitationUtil, StreamingResponse, - StreamingChannelData, ) __all__ = [ @@ -23,5 +22,4 @@ "Citation", "CitationUtil", "StreamingResponse", - "StreamingChannelData", ] diff --git a/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/__init__.py b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/__init__.py index be107941..8216be63 100644 --- a/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/__init__.py +++ b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/__init__.py @@ -5,12 +5,10 @@ Citation, CitationUtil, StreamingResponse, - StreamingChannelData, ) __all__ = [ "Citation", "CitationUtil", "StreamingResponse", - "StreamingChannelData", ] diff --git a/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/__init__.py b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/__init__.py index f46c8760..4cd61f38 100644 --- a/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/__init__.py +++ b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/__init__.py @@ -3,11 +3,10 @@ from .citation import Citation from .citation_util import CitationUtil -from .streaming_response import StreamingResponse, StreamingChannelData +from .streaming_response import StreamingResponse __all__ = [ "Citation", "CitationUtil", "StreamingResponse", - "StreamingChannelData", ] diff --git a/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/streaming_response.py b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/streaming_response.py index 0d888a12..7158a4bd 100644 --- a/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/streaming_response.py +++ b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/streaming_response.py @@ -24,27 +24,6 @@ logger = logging.getLogger(__name__) -@dataclass -class StreamingChannelData: - """ - Structure of the outgoing channelData field for streaming responses. - - The expected sequence of streamTypes is: - 'informative', 'streaming', 'streaming', ..., 'final'. - - Once a 'final' message is sent, the stream is considered ended. - """ - - stream_type: Literal["informative", "streaming", "final"] - """The type of message being sent.""" - - stream_sequence: int - """Sequence number of the message in the stream. Starts at 1 for the first message.""" - - stream_id: Optional[str] = None - """ID of the stream. Assigned after the initial update is sent.""" - - class StreamingResponse: """ A helper class for streaming responses to the client. @@ -116,10 +95,13 @@ def create_activity(): activity = Activity( type="typing", text=text, - channel_data={ - "streamType": "informative", - "streamSequence": self._next_sequence, - }, + entities=[ + Entity( + type="streaminfo", + stream_type="informative", + stream_sequence=self._next_sequence, + ) + ], ) self._next_sequence += 1 return activity @@ -273,20 +255,26 @@ def create_activity(): type="message", text=self._message or "end stream response", attachments=self._attachments or [], - channel_data={ - "streamType": "final", - "streamSequence": self._next_sequence, - }, + entities=[ + Entity( + type="streaminfo", + stream_type="final", + stream_sequence=self._next_sequence, + ) + ], ) else: # Send typing activity activity = Activity( type="typing", text=self._message, - channel_data={ - "streamType": "streaming", - "streamSequence": self._next_sequence, - }, + entities=[ + Entity( + type="streaminfo", + stream_type="streaming", + stream_sequence=self._next_sequence, + ) + ], ) self._next_sequence += 1 return activity @@ -326,17 +314,27 @@ async def _send_activity(self, activity: Activity) -> None: Args: activity: The activity to send. """ - # Set activity ID to the assigned stream ID - if self._stream_id: - activity.id = self._stream_id - if not activity.channel_data: - activity.channel_data = {} - activity.channel_data["streamId"] = self._stream_id + + streaminfo_entity = None if not activity.entities: - activity.entities = [] + streaminfo_entity = Entity(type="streaminfo") + activity.entities = [streaminfo_entity] + else: + for entity in activity.entities: + if hasattr(entity, "type") and entity.type == "streaminfo": + streaminfo_entity = entity + break + + if not streaminfo_entity: + # If no streaminfo entity exists, create one + streaminfo_entity = Entity(type="streaminfo") + activity.entities.append(streaminfo_entity) - activity.entities.append(Entity(type="streaminfo", **activity.channel_data)) + # Set activity ID to the assigned stream ID + if self._stream_id: + activity.id = self._stream_id + streaminfo_entity.stream_id = self._stream_id if self._citations and len(self._citations) > 0 and not self._ended: # Filter out the citations unused in content. @@ -357,18 +355,11 @@ async def _send_activity(self, activity: Activity) -> None: # Add in Powered by AI feature flags if self._ended: if self._enable_feedback_loop and self._feedback_loop_type: - if not activity.channel_data: - activity.channel_data = {} - activity.channel_data["feedbackLoop"] = { - "type": self._feedback_loop_type - } + # Add feedback loop to streaminfo entity + streaminfo_entity.feedback_loop = {"type": self._feedback_loop_type} else: - if not activity.channel_data: - activity.channel_data = {} - activity.channel_data["feedbackLoopEnabled"] = ( - self._enable_feedback_loop - ) - + # Add feedback loop enabled to streaminfo entity + streaminfo_entity.feedback_loop_enabled = self._enable_feedback_loop # Add in Generated by AI if self._enable_generated_by_ai_label: add_ai_to_activity(activity, self._citations, self._sensitivity_label) From ebac5a9e861e50d9cedd3b65e3ed7f73a8aea9db Mon Sep 17 00:00:00 2001 From: Axel Suarez Date: Tue, 15 Jul 2025 16:46:44 -0700 Subject: [PATCH 5/7] Fix for Entity serialization --- .../connector/client/connector_client.py | 12 +++++------ .../microsoft/agents/core/models/entity.py | 20 ++++++++++++++++++- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/libraries/Client/microsoft-agents-connector/microsoft/agents/connector/client/connector_client.py b/libraries/Client/microsoft-agents-connector/microsoft/agents/connector/client/connector_client.py index cd09852e..affcd336 100644 --- a/libraries/Client/microsoft-agents-connector/microsoft/agents/connector/client/connector_client.py +++ b/libraries/Client/microsoft-agents-connector/microsoft/agents/connector/client/connector_client.py @@ -18,9 +18,6 @@ ConversationsResult, PagedMembersResult, ) -from microsoft.agents.authorization import ( - AccessTokenProviderBase, -) from microsoft.agents.connector import ConnectorClientBase from ..attachments_base import AttachmentsBase from ..conversations_base import ConversationsBase @@ -179,15 +176,16 @@ async def reply_to_activity( by_alias=True, exclude_unset=True, exclude_none=True, mode="json" ), ) as response: + result = await response.json() if response.content_length else {} + if response.status >= 400: - logger.error(f"Error replying to activity: {response.status}") + logger.error(f"Error replying to activity: {result or response.status}") response.raise_for_status() - data = await response.json() if response.content_length else {} logger.info( - f"Reply to conversation/activity: {data.get('id')}, {activity_id}" + f"Reply to conversation/activity: {result.get('id')}, {activity_id}" ) - return ResourceResponse.model_validate(data) + return ResourceResponse.model_validate(result) async def send_to_conversation( self, conversation_id: str, body: Activity diff --git a/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/entity.py b/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/entity.py index af250a86..08bbf242 100644 --- a/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/entity.py +++ b/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/entity.py @@ -1,5 +1,8 @@ -from typing import Any +from typing import Any, Optional + +from pydantic import model_serializer, model_validator from .agents_model import AgentsModel, ConfigDict +from pydantic.alias_generators import to_camel, to_snake from ._type_aliases import NonEmptyString @@ -18,3 +21,18 @@ class Entity(AgentsModel): def additional_properties(self) -> dict[str, Any]: """Returns the set of properties that are not None.""" return self.model_extra + + @model_validator(mode="before") + @classmethod + def to_snake_for_all(cls, data): + ret = {to_snake(k): v for k, v in data.items()} + return ret + + @model_serializer(mode="plain") + def to_camel_for_all(self, config): + if config.by_alias: + new_data = {} + for k, v in self: + new_data[to_camel(k)] = v + return new_data + return {k: v for k, v in self} From e51c02b99b269c7c727c52b2750eb4b101d22f01 Mon Sep 17 00:00:00 2001 From: Axel Suarez Date: Wed, 16 Jul 2025 15:07:01 -0700 Subject: [PATCH 6/7] Straming entities only --- .../agents/core/models/delivery_modes.py | 1 + .../app/streaming/streaming_response.py | 30 +++++++++++++++++-- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/delivery_modes.py b/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/delivery_modes.py index 9be42371..856601b5 100644 --- a/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/delivery_modes.py +++ b/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/delivery_modes.py @@ -6,3 +6,4 @@ class DeliveryModes(str, Enum): notification = "notification" expect_replies = "expectReplies" ephemeral = "ephemeral" + stream = "stream" diff --git a/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/streaming_response.py b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/streaming_response.py index 7158a4bd..5b1552ee 100644 --- a/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/streaming_response.py +++ b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/streaming_response.py @@ -10,7 +10,9 @@ Activity, Entity, Attachment, + Channels, ClientCitation, + DeliveryModes, SensitivityUsageInfo, add_ai_to_activity, ) @@ -62,6 +64,11 @@ def __init__(self, context: "TurnContext"): self._citations: Optional[List[ClientCitation]] = [] self._sensitivity_label: Optional[SensitivityUsageInfo] = None + # Channel information + self._is_streaming_channel: bool = False + self._interval: float = 0.1 # Default interval for sending updates + self._set_defaults(context) + @property def stream_id(self) -> Optional[str]: """ @@ -87,6 +94,9 @@ def queue_informative_update(self, text: str) -> None: Args: text: Text of the update to send. """ + if not self._is_streaming_channel: + return + if self._ended: raise RuntimeError("The stream has already ended.") @@ -236,6 +246,17 @@ async def wait_for_queue(self) -> None: if self._queue_sync: await self._queue_sync + def _set_defaults(self, context: "TurnContext"): + if context.activity.channel_id == Channels.ms_teams: + self._is_streaming_channel = True + self._interval = 1.0 + elif context.activity.channel_id == Channels.direct_line: + self._is_streaming_channel = True + self._interval = 0.5 + elif context.activity.delivery_mode == DeliveryModes.stream: + self._is_streaming_channel = True + self._interval = 0.1 + def _queue_next_chunk(self) -> None: """ Queues the next chunk of text to be sent to the client. @@ -263,7 +284,7 @@ def create_activity(): ) ], ) - else: + elif self._is_streaming_channel: # Send typing activity activity = Activity( type="typing", @@ -276,6 +297,8 @@ def create_activity(): ) ], ) + else: + return self._next_sequence += 1 return activity @@ -300,7 +323,8 @@ async def _drain_queue(self) -> None: while self._queue: factory = self._queue.pop(0) activity = factory() - await self._send_activity(activity) + if activity: + await self._send_activity(activity) except Exception as err: logger.error(f"Error occurred when sending activity while streaming: {err}") raise @@ -366,7 +390,7 @@ async def _send_activity(self, activity: Activity) -> None: # Send activity response = await self._context.send_activity(activity) - await asyncio.sleep(1.5) # Equivalent to setTimeout in the TypeScript code + await asyncio.sleep(1) # Save assigned stream ID if not self._stream_id and response: From 440e66e2d8f2c857027df49aeb9e975fc5f397fb Mon Sep 17 00:00:00 2001 From: Axel Suarez Date: Wed, 16 Jul 2025 16:06:10 -0700 Subject: [PATCH 7/7] Implementing stopping hte stream --- .../app/streaming/streaming_response.py | 35 +++++++++++++------ 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/streaming_response.py b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/streaming_response.py index 5b1552ee..5d6ed581 100644 --- a/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/streaming_response.py +++ b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/streaming_response.py @@ -46,11 +46,12 @@ def __init__(self, context: "TurnContext"): context: Context for the current turn of conversation with the user. """ self._context = context - self._next_sequence = 1 + self._sequence_number = 1 self._stream_id: Optional[str] = None self._message = "" self._attachments: Optional[List[Attachment]] = None self._ended = False + self._cancelled = False # Queue for outgoing activities self._queue: List[Callable[[], Activity]] = [] @@ -66,6 +67,7 @@ def __init__(self, context: "TurnContext"): # Channel information self._is_streaming_channel: bool = False + self._channel_id: Channels = None self._interval: float = 0.1 # Default interval for sending updates self._set_defaults(context) @@ -85,7 +87,7 @@ def citations(self) -> Optional[List[ClientCitation]]: @property def updates_sent(self) -> int: """Gets the number of updates sent for the stream.""" - return self._next_sequence - 1 + return self._sequence_number - 1 def queue_informative_update(self, text: str) -> None: """ @@ -109,11 +111,11 @@ def create_activity(): Entity( type="streaminfo", stream_type="informative", - stream_sequence=self._next_sequence, + stream_sequence=self._sequence_number, ) ], ) - self._next_sequence += 1 + self._sequence_number += 1 return activity self._queue_activity(create_activity) @@ -131,6 +133,8 @@ def queue_text_chunk( text: Partial text of the message to send. citations: Citations to be included in the message. """ + if self._cancelled: + return if self._ended: raise RuntimeError("The stream has already ended.") @@ -257,6 +261,8 @@ def _set_defaults(self, context: "TurnContext"): self._is_streaming_channel = True self._interval = 0.1 + self._channel_id = context.activity.channel_id + def _queue_next_chunk(self) -> None: """ Queues the next chunk of text to be sent to the client. @@ -280,7 +286,7 @@ def create_activity(): Entity( type="streaminfo", stream_type="final", - stream_sequence=self._next_sequence, + stream_sequence=self._sequence_number, ) ], ) @@ -293,13 +299,13 @@ def create_activity(): Entity( type="streaminfo", stream_type="streaming", - stream_sequence=self._next_sequence, + stream_sequence=self._sequence_number, ) ], ) else: return - self._next_sequence += 1 + self._sequence_number += 1 return activity self._queue_activity(create_activity) @@ -326,8 +332,17 @@ async def _drain_queue(self) -> None: if activity: await self._send_activity(activity) except Exception as err: - logger.error(f"Error occurred when sending activity while streaming: {err}") - raise + if ( + "403" in str(err) + and self._context.activity.channel_id == Channels.ms_teams + ): + logger.warning("Teams channel stopped the stream.") + self._cancelled = True + else: + logger.error( + f"Error occurred when sending activity while streaming: {err}" + ) + raise finally: self._queue_sync = None @@ -390,7 +405,7 @@ async def _send_activity(self, activity: Activity) -> None: # Send activity response = await self._context.send_activity(activity) - await asyncio.sleep(1) + await asyncio.sleep(self._interval) # Save assigned stream ID if not self._stream_id and response: