diff --git a/libraries/Builder/microsoft-agents-builder/microsoft/agents/builder/turn_context.py b/libraries/Builder/microsoft-agents-builder/microsoft/agents/builder/turn_context.py index d2d49402..ef4a32ac 100644 --- a/libraries/Builder/microsoft-agents-builder/microsoft/agents/builder/turn_context.py +++ b/libraries/Builder/microsoft-agents-builder/microsoft/agents/builder/turn_context.py @@ -124,6 +124,25 @@ def services(self): """ return self._services + @property + def streaming_response(self): + """ + Gets a StreamingResponse instance for this turn context. + This allows for streaming partial responses to the client. + """ + # Use lazy import to avoid circular dependency + if not hasattr(self, "_streaming_response"): + try: + from microsoft.agents.hosting.aiohttp.app.streaming import ( + StreamingResponse, + ) + + self._streaming_response = StreamingResponse(self) + except ImportError: + # If the hosting library isn't available, return None + self._streaming_response = None + return self._streaming_response + def get(self, key: str) -> object: if not key or not isinstance(key, str): raise TypeError('"key" must be a valid string.') diff --git a/libraries/Client/microsoft-agents-connector/microsoft/agents/connector/client/connector_client.py b/libraries/Client/microsoft-agents-connector/microsoft/agents/connector/client/connector_client.py index 5d09d4a1..affcd336 100644 --- a/libraries/Client/microsoft-agents-connector/microsoft/agents/connector/client/connector_client.py +++ b/libraries/Client/microsoft-agents-connector/microsoft/agents/connector/client/connector_client.py @@ -18,9 +18,6 @@ ConversationsResult, PagedMembersResult, ) -from microsoft.agents.authorization import ( - AccessTokenProviderBase, -) from microsoft.agents.connector import ConnectorClientBase from ..attachments_base import AttachmentsBase from ..conversations_base import ConversationsBase @@ -175,17 +172,20 @@ async def reply_to_activity( async with self.client.post( url, - json=body.model_dump(by_alias=True, exclude_unset=True, mode="json"), + json=body.model_dump( + by_alias=True, exclude_unset=True, exclude_none=True, mode="json" + ), ) as response: + result = await response.json() if response.content_length else {} + if response.status >= 400: - logger.error(f"Error replying to activity: {response.status}") + logger.error(f"Error replying to activity: {result or response.status}") response.raise_for_status() - data = await response.json() if response.content_length else {} logger.info( - f"Reply to conversation/activity: {data.get('id')}, {activity_id}" + f"Reply to conversation/activity: {result.get('id')}, {activity_id}" ) - return ResourceResponse.model_validate(data) + return ResourceResponse.model_validate(result) async def send_to_conversation( self, conversation_id: str, body: Activity diff --git a/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/__init__.py b/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/__init__.py index adf42987..7537dfb9 100644 --- a/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/__init__.py +++ b/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/__init__.py @@ -25,6 +25,16 @@ from .conversations_result import ConversationsResult from .expected_replies import ExpectedReplies from .entity import Entity +from .ai_entity import ( + AIEntity, + ClientCitation, + ClientCitationAppearance, + ClientCitationImage, + ClientCitationIconName, + SensitivityUsageInfo, + SensitivityPattern, + add_ai_to_activity, +) from .error import Error from .error_response import ErrorResponse from .fact import Fact @@ -109,6 +119,14 @@ "ConversationsResult", "ExpectedReplies", "Entity", + "AIEntity", + "ClientCitation", + "ClientCitationAppearance", + "ClientCitationImage", + "ClientCitationIconName", + "SensitivityUsageInfo", + "SensitivityPattern", + "add_ai_to_activity", "Error", "ErrorResponse", "Fact", diff --git a/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/activity.py b/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/activity.py index 37be7d7a..67a359ad 100644 --- a/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/activity.py +++ b/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/activity.py @@ -1,7 +1,7 @@ from copy import copy from datetime import datetime, timezone from typing import Optional -from pydantic import Field +from pydantic import Field, SerializeAsAny from .activity_types import ActivityTypes from .channel_account import ChannelAccount from .conversation_account import ConversationAccount @@ -145,7 +145,7 @@ class Activity(AgentsModel): summary: NonEmptyString = None suggested_actions: SuggestedActions = None attachments: list[Attachment] = None - entities: list[Entity] = None + entities: list[SerializeAsAny[Entity]] = None channel_data: object = None action: NonEmptyString = None reply_to_id: NonEmptyString = None diff --git a/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/ai_entity.py b/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/ai_entity.py new file mode 100644 index 00000000..f7517f70 --- /dev/null +++ b/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/ai_entity.py @@ -0,0 +1,142 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from enum import Enum +from typing import List, Optional, Union, Literal +from dataclasses import dataclass + +from .agents_model import AgentsModel +from .activity import Activity +from .entity import Entity + + +class ClientCitationIconName(str, Enum): + """Enumeration of supported citation icon names.""" + + MICROSOFT_WORD = "Microsoft Word" + MICROSOFT_EXCEL = "Microsoft Excel" + MICROSOFT_POWERPOINT = "Microsoft PowerPoint" + MICROSOFT_ONENOTE = "Microsoft OneNote" + MICROSOFT_SHAREPOINT = "Microsoft SharePoint" + MICROSOFT_VISIO = "Microsoft Visio" + MICROSOFT_LOOP = "Microsoft Loop" + MICROSOFT_WHITEBOARD = "Microsoft Whiteboard" + ADOBE_ILLUSTRATOR = "Adobe Illustrator" + ADOBE_PHOTOSHOP = "Adobe Photoshop" + ADOBE_INDESIGN = "Adobe InDesign" + ADOBE_FLASH = "Adobe Flash" + SKETCH = "Sketch" + SOURCE_CODE = "Source Code" + IMAGE = "Image" + GIF = "GIF" + VIDEO = "Video" + SOUND = "Sound" + ZIP = "ZIP" + TEXT = "Text" + PDF = "PDF" + + +class ClientCitationImage(AgentsModel): + """Information about the citation's icon.""" + + type: str = "ImageObject" + name: str = "" + + +class SensitivityPattern(AgentsModel): + """Pattern information for sensitivity usage info.""" + + type: str = "DefinedTerm" + in_defined_term_set: str = "" + name: str = "" + term_code: str = "" + + +class SensitivityUsageInfo(AgentsModel): + """ + Sensitivity usage info for content sent to the user. + This is used to provide information about the content to the user. + """ + + type: str = "https://schema.org/Message" + schema_type: str = "CreativeWork" + description: Optional[str] = None + name: str = "" + position: Optional[int] = None + pattern: Optional[SensitivityPattern] = None + + +class ClientCitationAppearance(AgentsModel): + """Appearance information for a client citation.""" + + type: str = "DigitalDocument" + name: str = "" + text: Optional[str] = None + url: Optional[str] = None + abstract: str = "" + encoding_format: Optional[str] = None + image: Optional[ClientCitationImage] = None + keywords: Optional[List[str]] = None + usage_info: Optional[SensitivityUsageInfo] = None + + +class ClientCitation(AgentsModel): + """ + Represents a Teams client citation to be included in a message. + See Bot messages with AI-generated content for more details. + https://learn.microsoft.com/en-us/microsoftteams/platform/bots/how-to/bot-messages-ai-generated-content?tabs=before%2Cbotmessage + """ + + type: str = "Claim" + position: int = 0 + appearance: Optional[ClientCitationAppearance] = None + + def __post_init__(self): + if self.appearance is None: + self.appearance = ClientCitationAppearance() + + +class AIEntity(Entity): + """Entity indicating AI-generated content.""" + + type: str = "https://schema.org/Message" + schema_type: str = "Message" + context: str = "https://schema.org" + id: str = "" + additional_type: Optional[List[str]] = None + citation: Optional[List[ClientCitation]] = None + usage_info: Optional[SensitivityUsageInfo] = None + + def __post_init__(self): + if self.additional_type is None: + self.additional_type = ["AIGeneratedContent"] + + +def add_ai_to_activity( + activity: Activity, + citations: Optional[List[ClientCitation]] = None, + usage_info: Optional[SensitivityUsageInfo] = None, +) -> None: + """ + Adds AI entity to an activity to indicate AI-generated content. + + Args: + activity: The activity to modify + citations: Optional list of citations + usage_info: Optional sensitivity usage information + """ + if citations: + ai_entity = AIEntity( + type="https://schema.org/Message", + schema_type="Message", + context="https://schema.org", + id="", + additional_type=["AIGeneratedContent"], + citation=citations, + usage_info=usage_info, + ) + + if activity.entities is None: + activity.entities = [] + + activity.entities.append(ai_entity) diff --git a/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/delivery_modes.py b/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/delivery_modes.py index 9be42371..856601b5 100644 --- a/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/delivery_modes.py +++ b/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/delivery_modes.py @@ -6,3 +6,4 @@ class DeliveryModes(str, Enum): notification = "notification" expect_replies = "expectReplies" ephemeral = "ephemeral" + stream = "stream" diff --git a/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/entity.py b/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/entity.py index af250a86..08bbf242 100644 --- a/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/entity.py +++ b/libraries/Core/microsoft-agents-core/microsoft/agents/core/models/entity.py @@ -1,5 +1,8 @@ -from typing import Any +from typing import Any, Optional + +from pydantic import model_serializer, model_validator from .agents_model import AgentsModel, ConfigDict +from pydantic.alias_generators import to_camel, to_snake from ._type_aliases import NonEmptyString @@ -18,3 +21,18 @@ class Entity(AgentsModel): def additional_properties(self) -> dict[str, Any]: """Returns the set of properties that are not None.""" return self.model_extra + + @model_validator(mode="before") + @classmethod + def to_snake_for_all(cls, data): + ret = {to_snake(k): v for k, v in data.items()} + return ret + + @model_serializer(mode="plain") + def to_camel_for_all(self, config): + if config.by_alias: + new_data = {} + for k, v in self: + new_data[to_camel(k)] = v + return new_data + return {k: v for k, v in self} diff --git a/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/__init__.py b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/__init__.py index 356f003d..ed572337 100644 --- a/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/__init__.py +++ b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/__init__.py @@ -6,6 +6,11 @@ jwt_authorization_middleware, jwt_authorization_decorator, ) +from .app.streaming import ( + Citation, + CitationUtil, + StreamingResponse, +) __all__ = [ "start_agent_process", @@ -14,4 +19,7 @@ "jwt_authorization_middleware", "jwt_authorization_decorator", "channel_service_route_table", + "Citation", + "CitationUtil", + "StreamingResponse", ] diff --git a/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/__init__.py b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/__init__.py new file mode 100644 index 00000000..8216be63 --- /dev/null +++ b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from .streaming import ( + Citation, + CitationUtil, + StreamingResponse, +) + +__all__ = [ + "Citation", + "CitationUtil", + "StreamingResponse", +] diff --git a/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/__init__.py b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/__init__.py new file mode 100644 index 00000000..4cd61f38 --- /dev/null +++ b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/__init__.py @@ -0,0 +1,12 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from .citation import Citation +from .citation_util import CitationUtil +from .streaming_response import StreamingResponse + +__all__ = [ + "Citation", + "CitationUtil", + "StreamingResponse", +] diff --git a/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/citation.py b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/citation.py new file mode 100644 index 00000000..f643639a --- /dev/null +++ b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/citation.py @@ -0,0 +1,22 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from typing import Optional +from dataclasses import dataclass + + +@dataclass +class Citation: + """Citations returned by the model.""" + + content: str + """The content of the citation.""" + + title: Optional[str] = None + """The title of the citation.""" + + url: Optional[str] = None + """The URL of the citation.""" + + filepath: Optional[str] = None + """The filepath of the document.""" diff --git a/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/citation_util.py b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/citation_util.py new file mode 100644 index 00000000..b13ca598 --- /dev/null +++ b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/citation_util.py @@ -0,0 +1,85 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import re +from typing import List, Optional + +from microsoft.agents.core.models import ClientCitation + + +class CitationUtil: + """Utility functions for manipulating text and citations.""" + + @staticmethod + def snippet(text: str, max_length: int) -> str: + """ + Clips the text to a maximum length in case it exceeds the limit. + + Args: + text: The text to clip. + max_length: The maximum length of the text to return, cutting off the last whole word. + + Returns: + The modified text + """ + if len(text) <= max_length: + return text + + snippet = text[:max_length] + snippet = snippet[: min(len(snippet), snippet.rfind(" "))] + snippet += "..." + return snippet + + @staticmethod + def format_citations_response(text: str) -> str: + """ + Convert citation tags `[doc(s)n]` to `[n]` where n is a number. + + Args: + text: The text to format. + + Returns: + The formatted text. + """ + return re.sub(r"\[docs?(\d+)\]", r"[\1]", text, flags=re.IGNORECASE) + + @staticmethod + def get_used_citations( + text: str, citations: List[ClientCitation] + ) -> Optional[List[ClientCitation]]: + """ + Get the citations used in the text. This will remove any citations that are + included in the citations array from the response but not referenced in the text. + + Args: + text: The text to search for citation references, i.e. [1], [2], etc. + citations: The list of citations to search for. + + Returns: + The list of citations used in the text. + """ + regex = re.compile(r"\[(\d+)\]", re.IGNORECASE) + matches = regex.findall(text) + + if not matches: + return None + + # Remove duplicates + filtered_matches = set(matches) + + # Add citations + used_citations = [] + for match in filtered_matches: + citation_ref = f"[{match}]" + found = next( + ( + citation + for citation in citations + if f"[{citation.position}]" == citation_ref + ), + None, + ) + if found: + used_citations.append(found) + + return used_citations if used_citations else None diff --git a/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/streaming_response.py b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/streaming_response.py new file mode 100644 index 00000000..5d6ed581 --- /dev/null +++ b/libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/streaming_response.py @@ -0,0 +1,412 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import asyncio +import logging +from typing import List, Optional, Callable, Literal, TYPE_CHECKING +from dataclasses import dataclass + +from microsoft.agents.core.models import ( + Activity, + Entity, + Attachment, + Channels, + ClientCitation, + DeliveryModes, + SensitivityUsageInfo, + add_ai_to_activity, +) + +if TYPE_CHECKING: + from microsoft.agents.builder.turn_context import TurnContext + +from .citation import Citation +from .citation_util import CitationUtil + +logger = logging.getLogger(__name__) + + +class StreamingResponse: + """ + A helper class for streaming responses to the client. + + This class is used to send a series of updates to the client in a single response. + The expected sequence of calls is: + + `queue_informative_update()`, `queue_text_chunk()`, `queue_text_chunk()`, ..., `end_stream()`. + + Once `end_stream()` is called, the stream is considered ended and no further updates can be sent. + """ + + def __init__(self, context: "TurnContext"): + """ + Creates a new StreamingResponse instance. + + Args: + context: Context for the current turn of conversation with the user. + """ + self._context = context + self._sequence_number = 1 + self._stream_id: Optional[str] = None + self._message = "" + self._attachments: Optional[List[Attachment]] = None + self._ended = False + self._cancelled = False + + # Queue for outgoing activities + self._queue: List[Callable[[], Activity]] = [] + self._queue_sync: Optional[asyncio.Task] = None + self._chunk_queued = False + + # Powered by AI feature flags + self._enable_feedback_loop = False + self._feedback_loop_type: Optional[Literal["default", "custom"]] = None + self._enable_generated_by_ai_label = False + self._citations: Optional[List[ClientCitation]] = [] + self._sensitivity_label: Optional[SensitivityUsageInfo] = None + + # Channel information + self._is_streaming_channel: bool = False + self._channel_id: Channels = None + self._interval: float = 0.1 # Default interval for sending updates + self._set_defaults(context) + + @property + def stream_id(self) -> Optional[str]: + """ + Gets the stream ID of the current response. + Assigned after the initial update is sent. + """ + return self._stream_id + + @property + def citations(self) -> Optional[List[ClientCitation]]: + """Gets the citations of the current response.""" + return self._citations + + @property + def updates_sent(self) -> int: + """Gets the number of updates sent for the stream.""" + return self._sequence_number - 1 + + def queue_informative_update(self, text: str) -> None: + """ + Queues an informative update to be sent to the client. + + Args: + text: Text of the update to send. + """ + if not self._is_streaming_channel: + return + + if self._ended: + raise RuntimeError("The stream has already ended.") + + # Queue a typing activity + def create_activity(): + activity = Activity( + type="typing", + text=text, + entities=[ + Entity( + type="streaminfo", + stream_type="informative", + stream_sequence=self._sequence_number, + ) + ], + ) + self._sequence_number += 1 + return activity + + self._queue_activity(create_activity) + + def queue_text_chunk( + self, text: str, citations: Optional[List[Citation]] = None + ) -> None: + """ + Queues a chunk of partial message text to be sent to the client. + + The text will be sent as quickly as possible to the client. + Chunks may be combined before delivery to the client. + + Args: + text: Partial text of the message to send. + citations: Citations to be included in the message. + """ + if self._cancelled: + return + if self._ended: + raise RuntimeError("The stream has already ended.") + + # Update full message text + self._message += text + + # If there are citations, modify the content so that the sources are numbers instead of [doc1], [doc2], etc. + self._message = CitationUtil.format_citations_response(self._message) + + # Queue the next chunk + self._queue_next_chunk() + + async def end_stream(self) -> None: + """ + Ends the stream by sending the final message to the client. + """ + if self._ended: + raise RuntimeError("The stream has already ended.") + + # Queue final message + self._ended = True + self._queue_next_chunk() + + # Wait for the queue to drain + await self.wait_for_queue() + + def set_attachments(self, attachments: List[Attachment]) -> None: + """ + Sets the attachments to attach to the final chunk. + + Args: + attachments: List of attachments. + """ + self._attachments = attachments + + def set_sensitivity_label(self, sensitivity_label: SensitivityUsageInfo) -> None: + """ + Sets the sensitivity label to attach to the final chunk. + + Args: + sensitivity_label: The sensitivity label. + """ + self._sensitivity_label = sensitivity_label + + def set_citations(self, citations: List[Citation]) -> None: + """ + Sets the citations for the full message. + + Args: + citations: Citations to be included in the message. + """ + if citations: + if not self._citations: + self._citations = [] + + curr_pos = len(self._citations) + + for citation in citations: + client_citation = ClientCitation( + type="Claim", + position=curr_pos + 1, + appearance={ + "type": "DigitalDocument", + "name": citation.title or f"Document #{curr_pos + 1}", + "abstract": CitationUtil.snippet(citation.content, 477), + }, + ) + curr_pos += 1 + self._citations.append(client_citation) + + def set_feedback_loop(self, enable_feedback_loop: bool) -> None: + """ + Sets the Feedback Loop in Teams that allows a user to + give thumbs up or down to a response. + Default is False. + + Args: + enable_feedback_loop: If true, the feedback loop is enabled. + """ + self._enable_feedback_loop = enable_feedback_loop + + def set_feedback_loop_type( + self, feedback_loop_type: Literal["default", "custom"] + ) -> None: + """ + Sets the type of UI to use for the feedback loop. + + Args: + feedback_loop_type: The type of the feedback loop. + """ + self._feedback_loop_type = feedback_loop_type + + def set_generated_by_ai_label(self, enable_generated_by_ai_label: bool) -> None: + """ + Sets the Generated by AI label in Teams. + Default is False. + + Args: + enable_generated_by_ai_label: If true, the label is added. + """ + self._enable_generated_by_ai_label = enable_generated_by_ai_label + + def get_message(self) -> str: + """ + Returns the most recently streamed message. + """ + return self._message + + async def wait_for_queue(self) -> None: + """ + Waits for the outgoing activity queue to be empty. + """ + if self._queue_sync: + await self._queue_sync + + def _set_defaults(self, context: "TurnContext"): + if context.activity.channel_id == Channels.ms_teams: + self._is_streaming_channel = True + self._interval = 1.0 + elif context.activity.channel_id == Channels.direct_line: + self._is_streaming_channel = True + self._interval = 0.5 + elif context.activity.delivery_mode == DeliveryModes.stream: + self._is_streaming_channel = True + self._interval = 0.1 + + self._channel_id = context.activity.channel_id + + def _queue_next_chunk(self) -> None: + """ + Queues the next chunk of text to be sent to the client. + """ + # Are we already waiting to send a chunk? + if self._chunk_queued: + return + + # Queue a chunk of text to be sent + self._chunk_queued = True + + def create_activity(): + self._chunk_queued = False + if self._ended: + # Send final message + activity = Activity( + type="message", + text=self._message or "end stream response", + attachments=self._attachments or [], + entities=[ + Entity( + type="streaminfo", + stream_type="final", + stream_sequence=self._sequence_number, + ) + ], + ) + elif self._is_streaming_channel: + # Send typing activity + activity = Activity( + type="typing", + text=self._message, + entities=[ + Entity( + type="streaminfo", + stream_type="streaming", + stream_sequence=self._sequence_number, + ) + ], + ) + else: + return + self._sequence_number += 1 + return activity + + self._queue_activity(create_activity) + + def _queue_activity(self, factory: Callable[[], Activity]) -> None: + """ + Queues an activity to be sent to the client. + """ + self._queue.append(factory) + + # If there's no sync in progress, start one + if not self._queue_sync: + self._queue_sync = asyncio.create_task(self._drain_queue()) + + async def _drain_queue(self) -> None: + """ + Sends any queued activities to the client until the queue is empty. + """ + try: + logger.debug(f"Draining queue with {len(self._queue)} activities.") + while self._queue: + factory = self._queue.pop(0) + activity = factory() + if activity: + await self._send_activity(activity) + except Exception as err: + if ( + "403" in str(err) + and self._context.activity.channel_id == Channels.ms_teams + ): + logger.warning("Teams channel stopped the stream.") + self._cancelled = True + else: + logger.error( + f"Error occurred when sending activity while streaming: {err}" + ) + raise + finally: + self._queue_sync = None + + async def _send_activity(self, activity: Activity) -> None: + """ + Sends an activity to the client and saves the stream ID returned. + + Args: + activity: The activity to send. + """ + + streaminfo_entity = None + + if not activity.entities: + streaminfo_entity = Entity(type="streaminfo") + activity.entities = [streaminfo_entity] + else: + for entity in activity.entities: + if hasattr(entity, "type") and entity.type == "streaminfo": + streaminfo_entity = entity + break + + if not streaminfo_entity: + # If no streaminfo entity exists, create one + streaminfo_entity = Entity(type="streaminfo") + activity.entities.append(streaminfo_entity) + + # Set activity ID to the assigned stream ID + if self._stream_id: + activity.id = self._stream_id + streaminfo_entity.stream_id = self._stream_id + + if self._citations and len(self._citations) > 0 and not self._ended: + # Filter out the citations unused in content. + curr_citations = CitationUtil.get_used_citations( + self._message, self._citations + ) + if curr_citations: + activity.entities.append( + Entity( + type="https://schema.org/Message", + schema_type="Message", + context="https://schema.org", + id="", + citation=curr_citations, + ) + ) + + # Add in Powered by AI feature flags + if self._ended: + if self._enable_feedback_loop and self._feedback_loop_type: + # Add feedback loop to streaminfo entity + streaminfo_entity.feedback_loop = {"type": self._feedback_loop_type} + else: + # Add feedback loop enabled to streaminfo entity + streaminfo_entity.feedback_loop_enabled = self._enable_feedback_loop + # Add in Generated by AI + if self._enable_generated_by_ai_label: + add_ai_to_activity(activity, self._citations, self._sensitivity_label) + + # Send activity + response = await self._context.send_activity(activity) + await asyncio.sleep(self._interval) + + # Save assigned stream ID + if not self._stream_id and response: + self._stream_id = response.id diff --git a/test_samples/app_style/streaming_agent.py b/test_samples/app_style/streaming_agent.py new file mode 100644 index 00000000..36b0ebd8 --- /dev/null +++ b/test_samples/app_style/streaming_agent.py @@ -0,0 +1,112 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import asyncio +from os import environ +import re +import sys +import traceback + +from dotenv import load_dotenv +from microsoft.agents.builder.app import AgentApplication, TurnState +from microsoft.agents.builder.app.oauth import Authorization +from microsoft.agents.hosting.aiohttp import CloudAdapter, Citation +from microsoft.agents.authentication.msal import MsalConnectionManager + +from microsoft.agents.builder import ( + TurnContext, + MessageFactory, +) +from microsoft.agents.storage import MemoryStorage +from microsoft.agents.core import load_configuration_from_env +from microsoft.agents.core.models import Activity, ActivityTypes, SensitivityUsageInfo + +from shared import start_server + +load_dotenv() + +agents_sdk_config = load_configuration_from_env(environ) + +STORAGE = MemoryStorage() +CONNECTION_MANAGER = MsalConnectionManager(**agents_sdk_config) +ADAPTER = CloudAdapter(connection_manager=CONNECTION_MANAGER) +AUTHORIZATION = Authorization(STORAGE, CONNECTION_MANAGER, **agents_sdk_config) + +AGENT_APP = AgentApplication[TurnState]( + storage=STORAGE, adapter=ADAPTER, authorization=AUTHORIZATION, **agents_sdk_config +) + + +@AGENT_APP.activity(ActivityTypes.invoke) +async def invoke(context: TurnContext, state: TurnState) -> str: + """ + Internal method to process template expansion or function invocation. + """ + invoke_response = Activity( + type=ActivityTypes.invoke_response, value={"status": 200} + ) + print(f"Invoke activity received: {context.activity}") + await context.send_activity(invoke_response) + + +@AGENT_APP.conversation_update("membersAdded") +async def on_members_added(context: TurnContext, _state: TurnState): + await context.send_activity( + "Welcome to the Streaming sample, send a message to see the echo feature in action." + ) + return True + + +@AGENT_APP.activity("message") +async def on_message(context: TurnContext, state: TurnState): + context.streaming_response.set_feedback_loop(True) + context.streaming_response.set_sensitivity_label( + SensitivityUsageInfo( + type="https://schema.org/Message", + schema_type="CreativeWork", + name="Internal", + ) + ) + context.streaming_response.set_generated_by_ai_label(True) + context.streaming_response.queue_informative_update("starting streaming response") + await asyncio.sleep(1) + + for i in range(5): + print(f"Streaming chunk {i + 1}") + context.streaming_response.queue_text_chunk(f"part [{i + 1}] ") + await asyncio.sleep(i * 0.5) + + context.streaming_response.queue_text_chunk( + "This is the final message part. [doc1] and [doc2]" + ) + context.streaming_response.set_citations( + [ + Citation(title="Citation1", content="file", filepath="", url="file:////"), + Citation( + title="Citation2", + content="loooonger content", + filepath="", + url="file:////", + ), + ] + ) + + await context.streaming_response.end_stream() + + +@AGENT_APP.error +async def on_error(context: TurnContext, error: Exception): + # This check writes out errors to console log .vs. app insights. + # NOTE: In production environment, you should consider logging this to Azure + # application insights. + print(f"\n [on_turn_error] unhandled error: {error}", file=sys.stderr) + traceback.print_exc() + + # Send a message to the user + await context.send_activity("The bot encountered an error or bug.") + + +start_server( + agent_application=AGENT_APP, + auth_configuration=CONNECTION_MANAGER.get_default_connection_configuration(), +)