From a21845a0972a0d390972b998fdf21a46a7b29cda Mon Sep 17 00:00:00 2001 From: Axel Suarez Date: Wed, 29 Oct 2025 21:03:23 -0700 Subject: [PATCH 1/3] Proactive sample WIP --- test_samples/app_style/README.md | 44 ++ test_samples/app_style/emtpy_agent.py | 27 +- test_samples/app_style/env.TEMPLATE | 12 +- .../app_style/proactive_messaging_agent.py | 389 ++++++++++++++++++ 4 files changed, 469 insertions(+), 3 deletions(-) create mode 100644 test_samples/app_style/README.md create mode 100644 test_samples/app_style/proactive_messaging_agent.py diff --git a/test_samples/app_style/README.md b/test_samples/app_style/README.md new file mode 100644 index 00000000..4c79ffb0 --- /dev/null +++ b/test_samples/app_style/README.md @@ -0,0 +1,44 @@ +# App-style samples + +This folder contains end-to-end samples that resemble production “app-style” experiences built on the Microsoft 365 Agents Python SDK. The new proactive messaging sample shows how to start a Microsoft Teams conversation or send a proactive message to an existing one. + +## Proactive messaging sample + +`proactive_messaging_agent.py` hosts two HTTP endpoints: + +- `POST /api/createconversation` – creates a new 1:1 Teams conversation with a user and optionally sends an initial message. +- `POST /api/sendmessage` – sends another proactive message to an existing conversation id. + +### Prerequisites + +1. Python 3.10 or later. +2. Install the Agents Python SDK packages (for example by running `pip install -e libraries/microsoft-agents-*`). +3. A published Copilot Studio agent configured for Teams with application (client) ID, client secret, and tenant ID. + +### Configure environment variables + +1. Copy `env.TEMPLATE` to `.env` if you have not already. +2. Populate the connection settings used to acquire tokens: + - `CONNECTIONS__SERVICE_CONNECTION__SETTINGS__CLIENTID` + - `CONNECTIONS__SERVICE_CONNECTION__SETTINGS__CLIENTSECRET` + - `CONNECTIONS__SERVICE_CONNECTION__SETTINGS__TENANTID` +3. Add the proactive messaging settings from the template (bot id, agent id, tenant id, service URL, etc.). Optionally set `PROACTIVEMESSAGING__USERAADOBJECTID` to provide a default recipient. +4. Leave `TOKENVALIDATION__ENABLED=false` for local testing. Set it to `true` and supply a valid bearer token when calling the APIs if you need auth checks. + +### Run the sample + +```pwsh +python proactive_messaging_agent.py +``` + +The server listens on `http://localhost:5199` by default. Use the following helper commands to exercise the endpoints (replace the sample values with your own IDs): + +```pwsh +# Create a new conversation (returns conversationId) +Invoke-RestMethod -Method POST -Uri "http://localhost:5199/api/createconversation" -ContentType "application/json" -Body (@{ Message = "Hello from proactive sample"; UserAadObjectId = "00000000-0000-0000-0000-000000000123" } | ConvertTo-Json) + +# Send another proactive message +Invoke-RestMethod -Method POST -Uri "http://localhost:5199/api/sendmessage" -ContentType "application/json" -Body (@{ ConversationId = ""; Message = "Second proactive ping" } | ConvertTo-Json) +``` + +When `TOKENVALIDATION__ENABLED` is `true`, add an `Authorization: Bearer ` header to each call. The proactive endpoints will respond with JSON payloads describing success or validation errors. diff --git a/test_samples/app_style/emtpy_agent.py b/test_samples/app_style/emtpy_agent.py index dad36347..e18f5d6b 100644 --- a/test_samples/app_style/emtpy_agent.py +++ b/test_samples/app_style/emtpy_agent.py @@ -1,6 +1,13 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. +import logging +from os import environ, path +from dotenv import load_dotenv +from microsoft_agents.activity import load_configuration_from_env +from microsoft_agents.authentication.msal.msal_connection_manager import ( + MsalConnectionManager, +) from microsoft_agents.hosting.core import ( AgentApplication, TurnState, @@ -8,10 +15,23 @@ MemoryStorage, ) from microsoft_agents.hosting.aiohttp import CloudAdapter +from microsoft_agents.hosting.core.app.oauth.authorization import Authorization from shared import start_server -AGENT_APP = AgentApplication[TurnState](storage=MemoryStorage(), adapter=CloudAdapter()) +logging.basicConfig(level=logging.INFO) +load_dotenv(path.join(path.dirname(__file__), ".env")) + +agents_sdk_config = load_configuration_from_env(environ) + +STORAGE = MemoryStorage() +CONNECTION_MANAGER = MsalConnectionManager(**agents_sdk_config) +ADAPTER = CloudAdapter(connection_manager=CONNECTION_MANAGER) +AUTHORIZATION = Authorization(STORAGE, CONNECTION_MANAGER, **agents_sdk_config) + +AGENT_APP = AgentApplication[TurnState]( + storage=STORAGE, adapter=ADAPTER, authorization=AUTHORIZATION, **agents_sdk_config +) async def _help(context: TurnContext, _state: TurnState): @@ -33,6 +53,9 @@ async def on_message(context: TurnContext, _): if __name__ == "__main__": try: - start_server(AGENT_APP, None) + start_server( + agent_application=AGENT_APP, + auth_configuration=CONNECTION_MANAGER.get_default_connection_configuration(), + ) except Exception as error: raise error diff --git a/test_samples/app_style/env.TEMPLATE b/test_samples/app_style/env.TEMPLATE index f9adb9ac..89af6af9 100644 --- a/test_samples/app_style/env.TEMPLATE +++ b/test_samples/app_style/env.TEMPLATE @@ -16,4 +16,14 @@ AGENTAPPLICATION__USERAUTHORIZATION__HANDLERS__MCS__SETTINGS__OBOCONNECTIONNAME= COPILOTSTUDIOAGENT__ENVIRONMENTID=environment-id COPILOTSTUDIOAGENT__SCHEMANAME=schema-name COPILOTSTUDIOAGENT__TENANTID=tenant-id -COPILOTSTUDIOAGENT__AGENTAPPID=agent-app-id \ No newline at end of file +COPILOTSTUDIOAGENT__AGENTAPPID=agent-app-id + +# Proactive messaging sample settings +PROACTIVEMESSAGING__BOTID=28:teams-app-id +PROACTIVEMESSAGING__AGENTID=teams-app-id +PROACTIVEMESSAGING__TENANTID=tenant-id +PROACTIVEMESSAGING__SCOPE=https://api.botframework.com/.default +PROACTIVEMESSAGING__CHANNELID=msteams +PROACTIVEMESSAGING__SERVICEURL=https://smba.trafficmanager.net/teams/ +# Optional default user (if not supplied per request) +PROACTIVEMESSAGING__USERAADOBJECTID= diff --git a/test_samples/app_style/proactive_messaging_agent.py b/test_samples/app_style/proactive_messaging_agent.py new file mode 100644 index 00000000..b3b9d882 --- /dev/null +++ b/test_samples/app_style/proactive_messaging_agent.py @@ -0,0 +1,389 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +"""Proactive messaging sample demonstrating how to start and continue a Teams conversation.""" + +from __future__ import annotations + +import json +import logging +from dataclasses import dataclass +from os import environ, path +from typing import Any, Dict, Optional + +from aiohttp import web +from dotenv import load_dotenv + +from microsoft_agents.activity import ( + ChannelAccount, + ConversationAccount, + ConversationParameters, + ConversationReference, + # load_configuration_from_env, +) +from microsoft_agents.authentication.msal import MsalConnectionManager +from microsoft_agents.hosting.aiohttp import CloudAdapter, jwt_authorization_middleware +from microsoft_agents.hosting.core import ( + AgentApplication, + Authorization, + AuthenticationConstants, + ClaimsIdentity, + MessageFactory, + MemoryStorage, + TurnContext, + TurnState, +) + + +# TODO: libraries will be updated to add current changes to this method +def load_configuration_from_env( + env_vars: dict[str, Any], custom_config_keys: list[str] = None +) -> dict: + """ + Parses environment variables and returns a dictionary with the relevant configuration. + """ + custom_config_keys = custom_config_keys or [] + vars = env_vars.copy() + result = {} + for key, value in vars.items(): + levels = key.split("__") + current_level = result + last_level = None + for next_level in levels: + if next_level not in current_level: + current_level[next_level] = {} + last_level = current_level + current_level = current_level[next_level] + last_level[levels[-1]] = value + + if result.get("CONNECTIONSMAP") and isinstance(result["CONNECTIONSMAP"], dict): + result["CONNECTIONSMAP"] = [ + conn for conn in result.get("CONNECTIONSMAP", {}).values() + ] + + configuration = { + "AGENTAPPLICATION": result.get("AGENTAPPLICATION", {}), + "CONNECTIONS": result.get("CONNECTIONS", {}), + "CONNECTIONSMAP": result.get("CONNECTIONSMAP", {}), + } + configuration.update( + {key: result[key] for key in custom_config_keys if key in result} + ) + + return configuration + + +@dataclass +class ProactiveMessagingSettings: + """Configuration required to perform proactive operations.""" + + bot_id: str + agent_id: str + tenant_id: str + scope: str = "https://api.botframework.com/.default" + channel_id: str = "msteams" + service_url: str = "https://smba.trafficmanager.net/teams/" + default_user_aad_object_id: Optional[str] = None + + @classmethod + def from_dict(cls, raw: Dict[str, Any]) -> "ProactiveMessagingSettings": + try: + return cls( + bot_id=raw["BOTID"], + agent_id=raw["AGENTID"], + tenant_id=raw["TENANTID"], + scope=raw.get("SCOPE", cls.scope), + channel_id=raw.get("CHANNELID", cls.channel_id), + service_url=raw.get("SERVICEURL", cls.service_url), + default_user_aad_object_id=raw.get("USERAADOBJECTID"), + ) + except KeyError as exc: # pragma: no cover - defensive guard + missing = exc.args[0] + raise ValueError( + f"Missing required PROACTIVEMESSAGING__{missing} environment variable" + ) from exc + + +class ProactiveMessenger: + """Encapsulates proactive conversation helpers for Teams.""" + + def __init__( + self, + adapter: CloudAdapter, + settings: ProactiveMessagingSettings, + ) -> None: + self._adapter = adapter + self._settings = settings + + async def create_conversation( + self, + message: Optional[str], + user_aad_object_id: Optional[str], + ) -> str: + """Create a new conversation and optionally seed it with an initial message.""" + + effective_user = user_aad_object_id or self._settings.default_user_aad_object_id + if not effective_user: + raise ValueError( + "UserAadObjectId is required when no default is configured." + ) + + members = [ChannelAccount(id=effective_user)] + parameters = ConversationParameters( + is_group=False, + agent=ChannelAccount(id=self._settings.agent_id), + members=members, + tenant_id=self._settings.tenant_id, + ) + + conversation_id: Optional[str] = None + + async def _callback(turn_context: TurnContext) -> None: + nonlocal conversation_id + conversation_id = turn_context.activity.conversation.id + if message: + await turn_context.send_activity(MessageFactory.text(message)) + + await self._adapter.create_conversation( + self._settings.bot_id, + self._settings.channel_id, + self._settings.service_url, + self._settings.scope, + parameters, + _callback, + ) + + if not conversation_id: + raise RuntimeError("Conversation id was not returned by the channel.") + + return conversation_id + + async def send_message(self, conversation_id: str, message: str) -> None: + """Send a proactive message into an existing conversation.""" + + if not conversation_id: + raise ValueError("conversationId is required.") + if not message: + raise ValueError("message is required.") + + # TODO: user field not really needed but requires library update + reference = ConversationReference( + channel_id=self._settings.channel_id, + service_url=self._settings.service_url, + agent=ChannelAccount(id=self._settings.agent_id), + conversation=ConversationAccount( + id=conversation_id, tenant_id=self._settings.tenant_id + ), + user=ChannelAccount(id=self._settings.agent_id), + ) + continuation_activity = reference.get_continuation_activity() + + # TODO: activity id as parameter requires library update + continuation_activity.id = conversation_id + "|0001022" + + async def _callback(turn_context: TurnContext) -> None: + await turn_context.send_activity(MessageFactory.text(message)) + + await self._adapter.continue_conversation_with_claims( + self._create_identity( + self._settings.bot_id, + self._settings.agent_id, + ), + continuation_activity, + _callback, + "https://api.botframework.com", + ) + + @staticmethod + def _create_identity(audience: str, app_id: str) -> ClaimsIdentity: + """Create a claims identity for proactive messaging.""" + return ClaimsIdentity( + claims={ + AuthenticationConstants.AUDIENCE_CLAIM: audience, + AuthenticationConstants.APP_ID_CLAIM: app_id, + }, + is_authenticated=True, + ) + + +async def _read_optional_json(request: web.Request) -> Dict[str, Any]: + if request.content_length in (0, None): + return {} + try: + return await request.json() + except json.JSONDecodeError: + return {} + + +def create_app() -> web.Application: + """Create and configure the aiohttp application hosting the sample.""" + + load_dotenv(path.join(path.dirname(__file__), ".env")) + + # Load base Agents SDK configuration that the core components expect. + agents_sdk_config = load_configuration_from_env( + environ, custom_config_keys=["PROACTIVEMESSAGING"] + ) + + storage = MemoryStorage() + connection_manager = MsalConnectionManager(**agents_sdk_config) + adapter = CloudAdapter(connection_manager=connection_manager) + authorization = Authorization(storage, connection_manager, **agents_sdk_config) + agent_app = AgentApplication[TurnState]( + storage=storage, + adapter=adapter, + authorization=authorization, + **agents_sdk_config.get("AGENTAPPLICATION", {}), + ) + adapter.on_turn_error = _default_error_handler + + proactive_raw = agents_sdk_config.get("PROACTIVEMESSAGING", {}) + settings = ProactiveMessagingSettings.from_dict(proactive_raw) + messenger = ProactiveMessenger(adapter, settings) + + middlewares = [] + agent_config = connection_manager.get_default_connection_configuration() + if not agent_config: + raise ValueError("SERVICE_CONNECTION settings are missing.") + # middlewares.append(jwt_authorization_middleware) + + app = web.Application(middlewares=middlewares) + app["adapter"] = adapter + app["agent_app"] = agent_app + app["messenger"] = messenger + app["agent_configuration"] = agent_config + + app.router.add_get("/", _handle_root) + app.router.add_get("/healthz", _handle_health) + app.router.add_post("/api/createconversation", _handle_create_conversation) + app.router.add_post("/api/sendmessage", _handle_send_message) + + return app + + +async def _handle_root(request: web.Request) -> web.Response: + return web.json_response({"status": "ready", "sample": "proactive-messaging"}) + + +async def _handle_health(request: web.Request) -> web.Response: + return web.json_response({"status": "ok"}) + + +async def _handle_create_conversation(request: web.Request) -> web.Response: + messenger: ProactiveMessenger = request.app["messenger"] + payload = await _read_optional_json(request) + + message = payload.get("message") or payload.get("Message") + user_aad_object_id = payload.get("userAadObjectId") or payload.get( + "UserAadObjectId" + ) + + try: + conversation_id = await messenger.create_conversation( + message=message, user_aad_object_id=user_aad_object_id + ) + except ValueError as exc: + return web.json_response( + { + "status": "Error", + "error": { + "code": "Validation", + "message": str(exc), + }, + }, + status=400, + ) + except Exception as exc: # pragma: no cover - sample logging + logging.exception("Create conversation failed") + return web.json_response( + { + "status": "Error", + "error": { + "code": "ServerError", + "message": str(exc), + }, + }, + status=500, + ) + + return web.json_response( + { + "conversationId": conversation_id, + "status": "Created", + }, + status=201, + ) + + +async def _handle_send_message(request: web.Request) -> web.Response: + messenger: ProactiveMessenger = request.app["messenger"] + payload = await _read_optional_json(request) + + conversation_id = payload.get("conversationId") or payload.get("ConversationId") + message = payload.get("message") or payload.get("Message") + + if not conversation_id or not message: + return web.json_response( + { + "status": "Error", + "error": { + "code": "Validation", + "message": "conversationId and message are required", + }, + }, + status=400, + ) + + try: + await messenger.send_message(conversation_id, message) + except ValueError as exc: + return web.json_response( + { + "status": "Error", + "error": { + "code": "Validation", + "message": str(exc), + }, + }, + status=400, + ) + except Exception as exc: # pragma: no cover - sample logging + logging.exception("Send message failed") + return web.json_response( + { + "status": "Error", + "error": { + "code": "ServerError", + "message": str(exc), + }, + }, + status=500, + ) + + return web.json_response( + { + "conversationId": conversation_id, + "status": "Delivered", + } + ) + + +async def _default_error_handler(context: TurnContext, error: Exception) -> None: + logging.exception("Adapter pipeline failure", exc_info=error) + await context.send_activity( + "The proactive messaging agent encountered an unexpected error." + ) + + +def main() -> None: + logging.basicConfig(level=logging.INFO) + app = create_app() + + host = environ.get("HOST", "localhost") + port = int(environ.get("PORT", "5199")) + + web.run_app(app, host=host, port=port) + + +if __name__ == "__main__": + main() From d7109f28399f5dd9cd5f3f4de1811abf9fc8c320 Mon Sep 17 00:00:00 2001 From: Axel Suarez Martinez Date: Thu, 30 Oct 2025 11:47:53 -0700 Subject: [PATCH 2/3] Proactive echo in teams WIP --- .../core/authorization/claims_identity.py | 2 +- .../hosting/core/channel_service_adapter.py | 2 +- .../app_style/echo_proactive_agent.py | 253 ++++++++++++++++++ test_samples/app_style/emtpy_agent.py | 2 +- .../app_style/proactive_messaging_agent.py | 17 +- 5 files changed, 265 insertions(+), 11 deletions(-) create mode 100644 test_samples/app_style/echo_proactive_agent.py diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/authorization/claims_identity.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/authorization/claims_identity.py index af30b409..94f77225 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/authorization/claims_identity.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/authorization/claims_identity.py @@ -78,4 +78,4 @@ def get_token_audience(self) -> str: :return: The token audience. """ - return f"app://{self.get_outgoing_app_id()}" + return f"app://{self.get_outgoing_app_id()}" if self.is_agent_claim() else AuthenticationConstants.AGENTS_SDK_SCOPE diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/channel_service_adapter.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/channel_service_adapter.py index 2b4e6969..e9489971 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/channel_service_adapter.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/channel_service_adapter.py @@ -232,7 +232,7 @@ async def continue_conversation_with_claims( :type audience: Optional[str] """ return await self.process_proactive( - claims_identity, continuation_activity, audience, callback + claims_identity, continuation_activity, audience or claims_identity.get_token_audience(), callback ) async def create_conversation( # pylint: disable=arguments-differ diff --git a/test_samples/app_style/echo_proactive_agent.py b/test_samples/app_style/echo_proactive_agent.py new file mode 100644 index 00000000..ada6f802 --- /dev/null +++ b/test_samples/app_style/echo_proactive_agent.py @@ -0,0 +1,253 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from __future__ import annotations + +import logging +from dataclasses import dataclass +from os import environ, path +from typing import Any + +from dotenv import load_dotenv + +from microsoft_agents.activity import ( + Activity, + ActivityTypes, + ConversationReference, + EndOfConversationCodes, + MessageFactory, + load_configuration_from_env, +) +from microsoft_agents.authentication.msal import MsalConnectionManager +from microsoft_agents.hosting.aiohttp import CloudAdapter +from microsoft_agents.hosting.core import ( + AgentApplication, + Authorization, + ConversationState, + MemoryStorage, + TurnContext, + TurnState, +) +from microsoft_agents.hosting.core.app._routes import RouteRank +from microsoft_agents.hosting.core.authorization import ClaimsIdentity +from microsoft_agents.hosting.core.storage import StoreItem + +from shared import start_server + +logger = logging.getLogger(__name__) + + +@dataclass +class ConversationReferenceRecord(StoreItem): + """Persistable envelope for a conversation reference and associated identity.""" + + claims: dict[str, str] + is_authenticated: bool + authentication_type: str | None + reference: ConversationReference + + @staticmethod + def get_key(conversation_id: str) -> str: + return f"conversationreferences/{conversation_id}" + + @property + def key(self) -> str: + return self.get_key(self.reference.conversation.id) + + @classmethod + def from_context(cls, context: TurnContext) -> "ConversationReferenceRecord": + identity = context.identity or ClaimsIdentity({}, False) + reference = context.activity.get_conversation_reference() + return cls( + claims=dict(identity.claims), + is_authenticated=identity.is_authenticated, + authentication_type=identity.authentication_type, + reference=reference, + ) + + def to_identity(self) -> ClaimsIdentity: + return ClaimsIdentity( + claims=dict(self.claims), + is_authenticated=self.is_authenticated, + authentication_type=self.authentication_type, + ) + + def store_item_to_json(self) -> dict[str, Any]: + return { + "claims": dict(self.claims), + "is_authenticated": self.is_authenticated, + "authentication_type": self.authentication_type, + "reference": self.reference.model_dump(mode="json"), + } + + @staticmethod + def from_json_to_store_item( + json_data: dict[str, Any] + ) -> "ConversationReferenceRecord": + reference_data = json_data.get("reference") + if not reference_data: + raise ValueError("Conversation reference payload is missing.") + reference = ConversationReference.model_validate(reference_data) + return ConversationReferenceRecord( + claims=json_data.get("claims", {}), + is_authenticated=json_data.get("is_authenticated", False), + authentication_type=json_data.get("authentication_type"), + reference=reference, + ) + + +class EchoSkill(AgentApplication[TurnState]): + """AgentApplication equivalent of the Copilot Studio Echo skill sample.""" + + def __init__( + self, + *, + storage: MemoryStorage, + adapter: CloudAdapter, + authorization: Authorization, + connection_manager: MsalConnectionManager, + **options: Any, + ) -> None: + super().__init__( + connection_manager=connection_manager, + storage=storage, + adapter=adapter, + authorization=authorization, + **options, + ) + + self._storage = storage + self._conversation_state = ConversationState(storage) + + self.conversation_update("membersAdded")(self._welcome_message) + self.activity(ActivityTypes.end_of_conversation)( + self._handle_end_of_conversation + ) + self.activity(ActivityTypes.message, rank=RouteRank.LAST)(self._on_message) + self.error(self._handle_turn_error) + + async def _welcome_message( + self, context: TurnContext, _: TurnState + ) -> None: # pragma: no cover - sample behaviour + for member in context.activity.members_added or []: + recipient_id = context.activity.recipient.id if context.activity.recipient else None + if member.id != recipient_id: + await context.send_activity("Hi, This is EchoSkill") + + async def _handle_end_of_conversation( + self, context: TurnContext, state: TurnState + ) -> None: + await state.conversation.delete(context) + conversation = context.activity.conversation + if conversation and conversation.id: + await self._storage.delete([ConversationReferenceRecord.get_key(conversation.id)]) + + async def _on_message(self, context: TurnContext, state: TurnState) -> None: + text = context.activity.text or "" + if "end" in text: + await context.send_activity("(EchoSkill) Ending conversation...") + end_activity = Activity.create_end_of_conversation_activity() + end_activity.code = EndOfConversationCodes.completed_successfully + await context.send_activity(end_activity) + await state.conversation.delete(context) + return + + record = ConversationReferenceRecord.from_context(context) + await self._storage.write({record.key: record}) # Keep reference for proactive sends. + + await context.send_activity(MessageFactory.text(f"(EchoSkill): {text}")) + await context.send_activity( + MessageFactory.text('(EchoSkill): Say "end" and I\'ll end the conversation.') + ) + await state.save(context) + + async def _handle_turn_error( + self, context: TurnContext, error: Exception + ) -> None: # pragma: no cover - sample logging + logger.exception("Unhandled error during turn", exc_info=error) + + await self._conversation_state.load(context) + await self._conversation_state.delete(context) + + await context.send_activity( + MessageFactory.text(f"Error during turn, ending conversation ({error})") + ) + end_activity = Activity.create_end_of_conversation_activity() + end_activity.code = EndOfConversationCodes.error + end_activity.text = str(error) + await context.send_activity(end_activity) + + async def send_activity_to_conversation( + self, conversation_id: str, activity: Activity + ) -> bool: + if not conversation_id or not activity: + return False + + key = ConversationReferenceRecord.get_key(conversation_id) + items = await self._storage.read([key], target_cls=ConversationReferenceRecord) + record = items.get(key) + if not record: + return False + + continuation_activity = record.reference.get_continuation_activity() + + async def _callback(turn_context: TurnContext) -> None: + await turn_context.send_activity(activity) + + await self.adapter.continue_conversation_with_claims( + record.to_identity(), continuation_activity, _callback + ) + return True + + +@dataclass +class SendActivityRequest: + conversation_id: str + activity: Activity + + @classmethod + def from_dict(cls, payload: dict[str, Any]) -> "SendActivityRequest": + conversation_id = payload.get("conversationId") or payload.get("conversation_id") + activity_payload = payload.get("activity") + if not conversation_id or not activity_payload: + raise ValueError("conversationId and activity are required.") + activity = Activity.model_validate(activity_payload) + return cls(conversation_id=conversation_id, activity=activity) + + +load_dotenv(path.join(path.dirname(__file__), ".env")) + +agents_sdk_config = load_configuration_from_env(environ) + +STORAGE = MemoryStorage() +CONNECTION_MANAGER = MsalConnectionManager(**agents_sdk_config) +ADAPTER = CloudAdapter(connection_manager=CONNECTION_MANAGER) +AUTHORIZATION = Authorization(STORAGE, CONNECTION_MANAGER, **agents_sdk_config) + +ECHO_SKILL = EchoSkill( + storage=STORAGE, + adapter=ADAPTER, + authorization=AUTHORIZATION, + connection_manager=CONNECTION_MANAGER, + **agents_sdk_config.get("AGENTAPPLICATION", {}), +) + + +async def send_activity_to_conversation( + conversation_id: str, activity: Activity +) -> bool: + """Module-level convenience wrapper matching the C# sample signature.""" + + return await ECHO_SKILL.send_activity_to_conversation(conversation_id, activity) + + +def main() -> None: + logging.basicConfig(level=logging.INFO) + start_server( + agent_application=ECHO_SKILL, + auth_configuration=CONNECTION_MANAGER.get_default_connection_configuration(), + ) + + +if __name__ == "__main__": + main() diff --git a/test_samples/app_style/emtpy_agent.py b/test_samples/app_style/emtpy_agent.py index e18f5d6b..c68083c9 100644 --- a/test_samples/app_style/emtpy_agent.py +++ b/test_samples/app_style/emtpy_agent.py @@ -5,7 +5,7 @@ from os import environ, path from dotenv import load_dotenv from microsoft_agents.activity import load_configuration_from_env -from microsoft_agents.authentication.msal.msal_connection_manager import ( +from microsoft_agents.authentication.msal import ( MsalConnectionManager, ) from microsoft_agents.hosting.core import ( diff --git a/test_samples/app_style/proactive_messaging_agent.py b/test_samples/app_style/proactive_messaging_agent.py index b3b9d882..36e37cf2 100644 --- a/test_samples/app_style/proactive_messaging_agent.py +++ b/test_samples/app_style/proactive_messaging_agent.py @@ -170,16 +170,16 @@ async def send_message(self, conversation_id: str, message: str) -> None: reference = ConversationReference( channel_id=self._settings.channel_id, service_url=self._settings.service_url, - agent=ChannelAccount(id=self._settings.agent_id), + agent=ChannelAccount(id="<>", name="<>"), conversation=ConversationAccount( - id=conversation_id, tenant_id=self._settings.tenant_id + id=conversation_id ), - user=ChannelAccount(id=self._settings.agent_id), + user=ChannelAccount(id="user_id", name="user_name"), ) continuation_activity = reference.get_continuation_activity() # TODO: activity id as parameter requires library update - continuation_activity.id = conversation_id + "|0001022" + continuation_activity.id = conversation_id + "|0000001" async def _callback(turn_context: TurnContext) -> None: await turn_context.send_activity(MessageFactory.text(message)) @@ -187,20 +187,21 @@ async def _callback(turn_context: TurnContext) -> None: await self._adapter.continue_conversation_with_claims( self._create_identity( self._settings.bot_id, - self._settings.agent_id, + self._settings.service_url, + "https://api.botframework.com", ), continuation_activity, _callback, - "https://api.botframework.com", ) @staticmethod - def _create_identity(audience: str, app_id: str) -> ClaimsIdentity: + def _create_identity(audience: str, service_url: str, issuer: str) -> ClaimsIdentity: """Create a claims identity for proactive messaging.""" return ClaimsIdentity( claims={ AuthenticationConstants.AUDIENCE_CLAIM: audience, - AuthenticationConstants.APP_ID_CLAIM: app_id, + AuthenticationConstants.SERVICE_URL_CLAIM: service_url, + AuthenticationConstants.ISSUER_CLAIM: issuer, }, is_authenticated=True, ) From 09f925da612cea61b7cc9c10c2abbb573ae3235f Mon Sep 17 00:00:00 2001 From: Axel Suarez Martinez Date: Fri, 31 Oct 2025 00:25:19 -0700 Subject: [PATCH 3/3] Small fixes for proactive with experimental sample --- .../activity/channel_account.py | 12 +- .../activity/conversation_account.py | 8 +- .../core/authorization/claims_identity.py | 6 +- .../hosting/core/channel_service_adapter.py | 5 +- .../app_style/echo_proactive_agent.py | 288 +++++++------ .../app_style/proactive_messaging_agent.py | 390 ------------------ test_samples/fastapi/shared/__init__.py | 2 +- 7 files changed, 180 insertions(+), 531 deletions(-) delete mode 100644 test_samples/app_style/proactive_messaging_agent.py diff --git a/libraries/microsoft-agents-activity/microsoft_agents/activity/channel_account.py b/libraries/microsoft-agents-activity/microsoft_agents/activity/channel_account.py index c00e5ad7..cb0b7055 100644 --- a/libraries/microsoft-agents-activity/microsoft_agents/activity/channel_account.py +++ b/libraries/microsoft-agents-activity/microsoft_agents/activity/channel_account.py @@ -1,7 +1,7 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. -from typing import Any +from typing import Any, Optional from pydantic import ConfigDict from .agents_model import AgentsModel @@ -27,11 +27,11 @@ class ChannelAccount(AgentsModel): id: NonEmptyString = None name: str = None - aad_object_id: NonEmptyString = None - role: NonEmptyString = None - agentic_user_id: NonEmptyString = None - agentic_app_id: NonEmptyString = None - tenant_id: NonEmptyString = None + aad_object_id: Optional[NonEmptyString] = None + role: Optional[NonEmptyString] = None + agentic_user_id: Optional[NonEmptyString] = None + agentic_app_id: Optional[NonEmptyString] = None + tenant_id: Optional[NonEmptyString] = None @property def properties(self) -> dict[str, Any]: diff --git a/libraries/microsoft-agents-activity/microsoft_agents/activity/conversation_account.py b/libraries/microsoft-agents-activity/microsoft_agents/activity/conversation_account.py index 80a8bfa2..93813146 100644 --- a/libraries/microsoft-agents-activity/microsoft_agents/activity/conversation_account.py +++ b/libraries/microsoft-agents-activity/microsoft_agents/activity/conversation_account.py @@ -31,11 +31,11 @@ class ConversationAccount(AgentsModel): :type properties: object """ - is_group: bool = None + is_group: Optional[bool] = None conversation_type: NonEmptyString = None id: NonEmptyString - name: NonEmptyString = None - aad_object_id: NonEmptyString = None - role: NonEmptyString = None + name: Optional[NonEmptyString] = None + aad_object_id: Optional[NonEmptyString] = None + role: Optional[NonEmptyString] = None tenant_id: Optional[NonEmptyString] = None properties: object = None diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/authorization/claims_identity.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/authorization/claims_identity.py index 94f77225..5cd8f089 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/authorization/claims_identity.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/authorization/claims_identity.py @@ -78,4 +78,8 @@ def get_token_audience(self) -> str: :return: The token audience. """ - return f"app://{self.get_outgoing_app_id()}" if self.is_agent_claim() else AuthenticationConstants.AGENTS_SDK_SCOPE + return ( + f"app://{self.get_outgoing_app_id()}" + if self.is_agent_claim() + else AuthenticationConstants.AGENTS_SDK_SCOPE + ) diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/channel_service_adapter.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/channel_service_adapter.py index e9489971..60cf0e79 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/channel_service_adapter.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/channel_service_adapter.py @@ -232,7 +232,10 @@ async def continue_conversation_with_claims( :type audience: Optional[str] """ return await self.process_proactive( - claims_identity, continuation_activity, audience or claims_identity.get_token_audience(), callback + claims_identity, + continuation_activity, + audience or claims_identity.get_token_audience(), + callback, ) async def create_conversation( # pylint: disable=arguments-differ diff --git a/test_samples/app_style/echo_proactive_agent.py b/test_samples/app_style/echo_proactive_agent.py index ada6f802..ed91ded4 100644 --- a/test_samples/app_style/echo_proactive_agent.py +++ b/test_samples/app_style/echo_proactive_agent.py @@ -1,49 +1,65 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. +"""Echo skill sample that mirrors the Copilot Studio EchoSkill agent.""" from __future__ import annotations +import json import logging from dataclasses import dataclass from os import environ, path -from typing import Any +from typing import Any, Dict, Optional +from aiohttp import web from dotenv import load_dotenv from microsoft_agents.activity import ( + load_configuration_from_env, Activity, - ActivityTypes, ConversationReference, EndOfConversationCodes, - MessageFactory, - load_configuration_from_env, ) from microsoft_agents.authentication.msal import MsalConnectionManager -from microsoft_agents.hosting.aiohttp import CloudAdapter +from microsoft_agents.hosting.aiohttp import CloudAdapter, start_agent_process from microsoft_agents.hosting.core import ( AgentApplication, Authorization, - ConversationState, MemoryStorage, + MessageFactory, TurnContext, TurnState, ) -from microsoft_agents.hosting.core.app._routes import RouteRank from microsoft_agents.hosting.core.authorization import ClaimsIdentity from microsoft_agents.hosting.core.storage import StoreItem -from shared import start_server -logger = logging.getLogger(__name__) +@dataclass +class SendActivityRequest: + """Request payload used to resume conversations proactively.""" + + conversation_id: str + message: str + + @classmethod + def from_dict(cls, payload: Dict[str, Any]) -> "SendActivityRequest": + conversation_id = payload.get("conversationId") or payload.get( + "conversation_id" + ) + if not conversation_id: + raise ValueError("conversationId is required.") + + message = payload.get("message") + if not message: + raise ValueError("message is required.") + + return cls(conversation_id=conversation_id, message=message) @dataclass class ConversationReferenceRecord(StoreItem): - """Persistable envelope for a conversation reference and associated identity.""" + """Persistent envelope for a conversation reference and associated identity.""" claims: dict[str, str] is_authenticated: bool - authentication_type: str | None + authentication_type: Optional[str] reference: ConversationReference @staticmethod @@ -72,7 +88,7 @@ def to_identity(self) -> ClaimsIdentity: authentication_type=self.authentication_type, ) - def store_item_to_json(self) -> dict[str, Any]: + def store_item_to_json(self) -> Dict[str, Any]: return { "claims": dict(self.claims), "is_authenticated": self.is_authenticated, @@ -82,12 +98,15 @@ def store_item_to_json(self) -> dict[str, Any]: @staticmethod def from_json_to_store_item( - json_data: dict[str, Any] + json_data: Dict[str, Any], ) -> "ConversationReferenceRecord": - reference_data = json_data.get("reference") - if not reference_data: + reference_payload = json_data.get("reference") + if not reference_payload: raise ValueError("Conversation reference payload is missing.") - reference = ConversationReference.model_validate(reference_data) + + reference = ConversationReference.model_validate( + reference_payload, strict=False + ) return ConversationReferenceRecord( claims=json_data.get("claims", {}), is_authenticated=json_data.get("is_authenticated", False), @@ -96,95 +115,65 @@ def from_json_to_store_item( ) -class EchoSkill(AgentApplication[TurnState]): - """AgentApplication equivalent of the Copilot Studio Echo skill sample.""" +load_dotenv(path.join(path.dirname(__file__), ".env")) +agents_sdk_config = load_configuration_from_env(environ) - def __init__( - self, - *, - storage: MemoryStorage, - adapter: CloudAdapter, - authorization: Authorization, - connection_manager: MsalConnectionManager, - **options: Any, - ) -> None: - super().__init__( - connection_manager=connection_manager, - storage=storage, - adapter=adapter, - authorization=authorization, - **options, - ) +storage = MemoryStorage() +connection_manager = MsalConnectionManager(**agents_sdk_config) +adapter = CloudAdapter(connection_manager=connection_manager) +authorization = Authorization(storage, connection_manager, **agents_sdk_config) +AGENT_APP = AgentApplication[TurnState]( + storage=storage, + adapter=adapter, + authorization=authorization, + **agents_sdk_config.get("AGENTAPPLICATION", {}), +) - self._storage = storage - self._conversation_state = ConversationState(storage) - self.conversation_update("membersAdded")(self._welcome_message) - self.activity(ActivityTypes.end_of_conversation)( - self._handle_end_of_conversation - ) - self.activity(ActivityTypes.message, rank=RouteRank.LAST)(self._on_message) - self.error(self._handle_turn_error) - - async def _welcome_message( - self, context: TurnContext, _: TurnState - ) -> None: # pragma: no cover - sample behaviour - for member in context.activity.members_added or []: - recipient_id = context.activity.recipient.id if context.activity.recipient else None - if member.id != recipient_id: - await context.send_activity("Hi, This is EchoSkill") - - async def _handle_end_of_conversation( - self, context: TurnContext, state: TurnState - ) -> None: +@AGENT_APP.activity("message") +async def on_message(context: TurnContext, state: TurnState) -> None: + text = context.activity.text or "" + if "end" == text: + await context.send_activity("(EchoSkill) Ending conversation...") + end_activity = Activity.create_end_of_conversation_activity() + end_activity.code = EndOfConversationCodes.completed_successfully + await context.send_activity(end_activity) await state.conversation.delete(context) conversation = context.activity.conversation if conversation and conversation.id: - await self._storage.delete([ConversationReferenceRecord.get_key(conversation.id)]) - - async def _on_message(self, context: TurnContext, state: TurnState) -> None: - text = context.activity.text or "" - if "end" in text: - await context.send_activity("(EchoSkill) Ending conversation...") - end_activity = Activity.create_end_of_conversation_activity() - end_activity.code = EndOfConversationCodes.completed_successfully - await context.send_activity(end_activity) - await state.conversation.delete(context) - return - - record = ConversationReferenceRecord.from_context(context) - await self._storage.write({record.key: record}) # Keep reference for proactive sends. - - await context.send_activity(MessageFactory.text(f"(EchoSkill): {text}")) - await context.send_activity( - MessageFactory.text('(EchoSkill): Say "end" and I\'ll end the conversation.') - ) - await state.save(context) + await state.conversation._storage.delete( + [ConversationReferenceRecord.get_key(conversation.id)] + ) + return - async def _handle_turn_error( - self, context: TurnContext, error: Exception - ) -> None: # pragma: no cover - sample logging - logger.exception("Unhandled error during turn", exc_info=error) + logging.info( + f"(EchoSkill) ConversationReference to save: {context.activity.get_conversation_reference().model_dump(mode='json', exclude_unset=True, by_alias=True)} with message: {text}" + ) + record = ConversationReferenceRecord.from_context(context) + await state.conversation._storage.write({record.key: record}) - await self._conversation_state.load(context) - await self._conversation_state.delete(context) + await context.send_activity(MessageFactory.text(f"(EchoSkill): {text}")) - await context.send_activity( - MessageFactory.text(f"Error during turn, ending conversation ({error})") - ) - end_activity = Activity.create_end_of_conversation_activity() - end_activity.code = EndOfConversationCodes.error - end_activity.text = str(error) - await context.send_activity(end_activity) + +class EchoSkillService: + def __init__( + self, + storage: MemoryStorage, + adapter: CloudAdapter, + ) -> None: + self._storage = storage + self._adapter = adapter async def send_activity_to_conversation( - self, conversation_id: str, activity: Activity + self, conversation_id: str, message: str ) -> bool: - if not conversation_id or not activity: + if not conversation_id: return False key = ConversationReferenceRecord.get_key(conversation_id) - items = await self._storage.read([key], target_cls=ConversationReferenceRecord) + items: Dict[str, ConversationReferenceRecord] = await self._storage.read( + [key], target_cls=ConversationReferenceRecord + ) record = items.get(key) if not record: return False @@ -192,61 +181,104 @@ async def send_activity_to_conversation( continuation_activity = record.reference.get_continuation_activity() async def _callback(turn_context: TurnContext) -> None: - await turn_context.send_activity(activity) + await turn_context.send_activity(message) - await self.adapter.continue_conversation_with_claims( + await self._adapter.continue_conversation_with_claims( record.to_identity(), continuation_activity, _callback ) return True -@dataclass -class SendActivityRequest: - conversation_id: str - activity: Activity +async def _read_optional_json(request: web.Request) -> Dict[str, Any]: + if request.content_length in (0, None): + return {} + try: + return await request.json() + except json.JSONDecodeError: + return {} - @classmethod - def from_dict(cls, payload: dict[str, Any]) -> "SendActivityRequest": - conversation_id = payload.get("conversationId") or payload.get("conversation_id") - activity_payload = payload.get("activity") - if not conversation_id or not activity_payload: - raise ValueError("conversationId and activity are required.") - activity = Activity.model_validate(activity_payload) - return cls(conversation_id=conversation_id, activity=activity) +def create_app() -> web.Application: + """Create and configure the aiohttp application hosting the sample.""" -load_dotenv(path.join(path.dirname(__file__), ".env")) + load_dotenv(path.join(path.dirname(__file__), ".env")) -agents_sdk_config = load_configuration_from_env(environ) + echo_service = EchoSkillService(storage, adapter) + global SERVICE_INSTANCE + SERVICE_INSTANCE = echo_service -STORAGE = MemoryStorage() -CONNECTION_MANAGER = MsalConnectionManager(**agents_sdk_config) -ADAPTER = CloudAdapter(connection_manager=CONNECTION_MANAGER) -AUTHORIZATION = Authorization(STORAGE, CONNECTION_MANAGER, **agents_sdk_config) + app = web.Application() + app["adapter"] = adapter + app["agent_app"] = AGENT_APP + app["echo_service"] = echo_service + agent_config = connection_manager.get_default_connection_configuration() + if not agent_config: + raise ValueError("SERVICE_CONNECTION settings are missing.") + app["agent_configuration"] = agent_config -ECHO_SKILL = EchoSkill( - storage=STORAGE, - adapter=ADAPTER, - authorization=AUTHORIZATION, - connection_manager=CONNECTION_MANAGER, - **agents_sdk_config.get("AGENTAPPLICATION", {}), -) + app.router.add_get("/", _handle_root) + app.router.add_post("/api/messages", _agent_entry_point) + app.router.add_post("/api/sendactivity", _handle_send_activity) + + return app + + +async def _handle_root(request: web.Request) -> web.Response: + return web.json_response({"status": "ready", "sample": "echo-skill"}) + + +async def _agent_entry_point(request: web.Request) -> web.Response: + agent_app: AgentApplication = request.app["agent_app"] + adapter: CloudAdapter = request.app["adapter"] + response = await start_agent_process(request, agent_app, adapter) + return response or web.Response(status=202) + + +async def _handle_send_activity(request: web.Request) -> web.Response: + service: EchoSkillService = request.app["echo_service"] + payload = await _read_optional_json(request) + try: + send_request = SendActivityRequest.from_dict(payload) + except ValueError as exc: + return web.json_response( + { + "status": "Error", + "error": {"code": "Validation", "message": str(exc)}, + }, + status=400, + ) + + success = await service.send_activity_to_conversation( + send_request.conversation_id, send_request.message + ) -async def send_activity_to_conversation( - conversation_id: str, activity: Activity -) -> bool: - """Module-level convenience wrapper matching the C# sample signature.""" + if not success: + return web.json_response( + { + "status": "Error", + "error": { + "code": "NotFound", + "message": "Conversation reference not found.", + }, + }, + status=404, + ) - return await ECHO_SKILL.send_activity_to_conversation(conversation_id, activity) + return web.json_response( + {"status": "Delivered", "conversationId": send_request.conversation_id}, + status=202, + ) def main() -> None: logging.basicConfig(level=logging.INFO) - start_server( - agent_application=ECHO_SKILL, - auth_configuration=CONNECTION_MANAGER.get_default_connection_configuration(), - ) + app = create_app() + + host = environ.get("HOST", "localhost") + port = int(environ.get("PORT", "3978")) + + web.run_app(app, host=host, port=port) if __name__ == "__main__": diff --git a/test_samples/app_style/proactive_messaging_agent.py b/test_samples/app_style/proactive_messaging_agent.py deleted file mode 100644 index 36e37cf2..00000000 --- a/test_samples/app_style/proactive_messaging_agent.py +++ /dev/null @@ -1,390 +0,0 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. - -"""Proactive messaging sample demonstrating how to start and continue a Teams conversation.""" - -from __future__ import annotations - -import json -import logging -from dataclasses import dataclass -from os import environ, path -from typing import Any, Dict, Optional - -from aiohttp import web -from dotenv import load_dotenv - -from microsoft_agents.activity import ( - ChannelAccount, - ConversationAccount, - ConversationParameters, - ConversationReference, - # load_configuration_from_env, -) -from microsoft_agents.authentication.msal import MsalConnectionManager -from microsoft_agents.hosting.aiohttp import CloudAdapter, jwt_authorization_middleware -from microsoft_agents.hosting.core import ( - AgentApplication, - Authorization, - AuthenticationConstants, - ClaimsIdentity, - MessageFactory, - MemoryStorage, - TurnContext, - TurnState, -) - - -# TODO: libraries will be updated to add current changes to this method -def load_configuration_from_env( - env_vars: dict[str, Any], custom_config_keys: list[str] = None -) -> dict: - """ - Parses environment variables and returns a dictionary with the relevant configuration. - """ - custom_config_keys = custom_config_keys or [] - vars = env_vars.copy() - result = {} - for key, value in vars.items(): - levels = key.split("__") - current_level = result - last_level = None - for next_level in levels: - if next_level not in current_level: - current_level[next_level] = {} - last_level = current_level - current_level = current_level[next_level] - last_level[levels[-1]] = value - - if result.get("CONNECTIONSMAP") and isinstance(result["CONNECTIONSMAP"], dict): - result["CONNECTIONSMAP"] = [ - conn for conn in result.get("CONNECTIONSMAP", {}).values() - ] - - configuration = { - "AGENTAPPLICATION": result.get("AGENTAPPLICATION", {}), - "CONNECTIONS": result.get("CONNECTIONS", {}), - "CONNECTIONSMAP": result.get("CONNECTIONSMAP", {}), - } - configuration.update( - {key: result[key] for key in custom_config_keys if key in result} - ) - - return configuration - - -@dataclass -class ProactiveMessagingSettings: - """Configuration required to perform proactive operations.""" - - bot_id: str - agent_id: str - tenant_id: str - scope: str = "https://api.botframework.com/.default" - channel_id: str = "msteams" - service_url: str = "https://smba.trafficmanager.net/teams/" - default_user_aad_object_id: Optional[str] = None - - @classmethod - def from_dict(cls, raw: Dict[str, Any]) -> "ProactiveMessagingSettings": - try: - return cls( - bot_id=raw["BOTID"], - agent_id=raw["AGENTID"], - tenant_id=raw["TENANTID"], - scope=raw.get("SCOPE", cls.scope), - channel_id=raw.get("CHANNELID", cls.channel_id), - service_url=raw.get("SERVICEURL", cls.service_url), - default_user_aad_object_id=raw.get("USERAADOBJECTID"), - ) - except KeyError as exc: # pragma: no cover - defensive guard - missing = exc.args[0] - raise ValueError( - f"Missing required PROACTIVEMESSAGING__{missing} environment variable" - ) from exc - - -class ProactiveMessenger: - """Encapsulates proactive conversation helpers for Teams.""" - - def __init__( - self, - adapter: CloudAdapter, - settings: ProactiveMessagingSettings, - ) -> None: - self._adapter = adapter - self._settings = settings - - async def create_conversation( - self, - message: Optional[str], - user_aad_object_id: Optional[str], - ) -> str: - """Create a new conversation and optionally seed it with an initial message.""" - - effective_user = user_aad_object_id or self._settings.default_user_aad_object_id - if not effective_user: - raise ValueError( - "UserAadObjectId is required when no default is configured." - ) - - members = [ChannelAccount(id=effective_user)] - parameters = ConversationParameters( - is_group=False, - agent=ChannelAccount(id=self._settings.agent_id), - members=members, - tenant_id=self._settings.tenant_id, - ) - - conversation_id: Optional[str] = None - - async def _callback(turn_context: TurnContext) -> None: - nonlocal conversation_id - conversation_id = turn_context.activity.conversation.id - if message: - await turn_context.send_activity(MessageFactory.text(message)) - - await self._adapter.create_conversation( - self._settings.bot_id, - self._settings.channel_id, - self._settings.service_url, - self._settings.scope, - parameters, - _callback, - ) - - if not conversation_id: - raise RuntimeError("Conversation id was not returned by the channel.") - - return conversation_id - - async def send_message(self, conversation_id: str, message: str) -> None: - """Send a proactive message into an existing conversation.""" - - if not conversation_id: - raise ValueError("conversationId is required.") - if not message: - raise ValueError("message is required.") - - # TODO: user field not really needed but requires library update - reference = ConversationReference( - channel_id=self._settings.channel_id, - service_url=self._settings.service_url, - agent=ChannelAccount(id="<>", name="<>"), - conversation=ConversationAccount( - id=conversation_id - ), - user=ChannelAccount(id="user_id", name="user_name"), - ) - continuation_activity = reference.get_continuation_activity() - - # TODO: activity id as parameter requires library update - continuation_activity.id = conversation_id + "|0000001" - - async def _callback(turn_context: TurnContext) -> None: - await turn_context.send_activity(MessageFactory.text(message)) - - await self._adapter.continue_conversation_with_claims( - self._create_identity( - self._settings.bot_id, - self._settings.service_url, - "https://api.botframework.com", - ), - continuation_activity, - _callback, - ) - - @staticmethod - def _create_identity(audience: str, service_url: str, issuer: str) -> ClaimsIdentity: - """Create a claims identity for proactive messaging.""" - return ClaimsIdentity( - claims={ - AuthenticationConstants.AUDIENCE_CLAIM: audience, - AuthenticationConstants.SERVICE_URL_CLAIM: service_url, - AuthenticationConstants.ISSUER_CLAIM: issuer, - }, - is_authenticated=True, - ) - - -async def _read_optional_json(request: web.Request) -> Dict[str, Any]: - if request.content_length in (0, None): - return {} - try: - return await request.json() - except json.JSONDecodeError: - return {} - - -def create_app() -> web.Application: - """Create and configure the aiohttp application hosting the sample.""" - - load_dotenv(path.join(path.dirname(__file__), ".env")) - - # Load base Agents SDK configuration that the core components expect. - agents_sdk_config = load_configuration_from_env( - environ, custom_config_keys=["PROACTIVEMESSAGING"] - ) - - storage = MemoryStorage() - connection_manager = MsalConnectionManager(**agents_sdk_config) - adapter = CloudAdapter(connection_manager=connection_manager) - authorization = Authorization(storage, connection_manager, **agents_sdk_config) - agent_app = AgentApplication[TurnState]( - storage=storage, - adapter=adapter, - authorization=authorization, - **agents_sdk_config.get("AGENTAPPLICATION", {}), - ) - adapter.on_turn_error = _default_error_handler - - proactive_raw = agents_sdk_config.get("PROACTIVEMESSAGING", {}) - settings = ProactiveMessagingSettings.from_dict(proactive_raw) - messenger = ProactiveMessenger(adapter, settings) - - middlewares = [] - agent_config = connection_manager.get_default_connection_configuration() - if not agent_config: - raise ValueError("SERVICE_CONNECTION settings are missing.") - # middlewares.append(jwt_authorization_middleware) - - app = web.Application(middlewares=middlewares) - app["adapter"] = adapter - app["agent_app"] = agent_app - app["messenger"] = messenger - app["agent_configuration"] = agent_config - - app.router.add_get("/", _handle_root) - app.router.add_get("/healthz", _handle_health) - app.router.add_post("/api/createconversation", _handle_create_conversation) - app.router.add_post("/api/sendmessage", _handle_send_message) - - return app - - -async def _handle_root(request: web.Request) -> web.Response: - return web.json_response({"status": "ready", "sample": "proactive-messaging"}) - - -async def _handle_health(request: web.Request) -> web.Response: - return web.json_response({"status": "ok"}) - - -async def _handle_create_conversation(request: web.Request) -> web.Response: - messenger: ProactiveMessenger = request.app["messenger"] - payload = await _read_optional_json(request) - - message = payload.get("message") or payload.get("Message") - user_aad_object_id = payload.get("userAadObjectId") or payload.get( - "UserAadObjectId" - ) - - try: - conversation_id = await messenger.create_conversation( - message=message, user_aad_object_id=user_aad_object_id - ) - except ValueError as exc: - return web.json_response( - { - "status": "Error", - "error": { - "code": "Validation", - "message": str(exc), - }, - }, - status=400, - ) - except Exception as exc: # pragma: no cover - sample logging - logging.exception("Create conversation failed") - return web.json_response( - { - "status": "Error", - "error": { - "code": "ServerError", - "message": str(exc), - }, - }, - status=500, - ) - - return web.json_response( - { - "conversationId": conversation_id, - "status": "Created", - }, - status=201, - ) - - -async def _handle_send_message(request: web.Request) -> web.Response: - messenger: ProactiveMessenger = request.app["messenger"] - payload = await _read_optional_json(request) - - conversation_id = payload.get("conversationId") or payload.get("ConversationId") - message = payload.get("message") or payload.get("Message") - - if not conversation_id or not message: - return web.json_response( - { - "status": "Error", - "error": { - "code": "Validation", - "message": "conversationId and message are required", - }, - }, - status=400, - ) - - try: - await messenger.send_message(conversation_id, message) - except ValueError as exc: - return web.json_response( - { - "status": "Error", - "error": { - "code": "Validation", - "message": str(exc), - }, - }, - status=400, - ) - except Exception as exc: # pragma: no cover - sample logging - logging.exception("Send message failed") - return web.json_response( - { - "status": "Error", - "error": { - "code": "ServerError", - "message": str(exc), - }, - }, - status=500, - ) - - return web.json_response( - { - "conversationId": conversation_id, - "status": "Delivered", - } - ) - - -async def _default_error_handler(context: TurnContext, error: Exception) -> None: - logging.exception("Adapter pipeline failure", exc_info=error) - await context.send_activity( - "The proactive messaging agent encountered an unexpected error." - ) - - -def main() -> None: - logging.basicConfig(level=logging.INFO) - app = create_app() - - host = environ.get("HOST", "localhost") - port = int(environ.get("PORT", "5199")) - - web.run_app(app, host=host, port=port) - - -if __name__ == "__main__": - main() diff --git a/test_samples/fastapi/shared/__init__.py b/test_samples/fastapi/shared/__init__.py index be7909b4..3d625f30 100644 --- a/test_samples/fastapi/shared/__init__.py +++ b/test_samples/fastapi/shared/__init__.py @@ -11,5 +11,5 @@ "get_pull_requests", "get_user_info", "create_profile_card", - "create_pr_card" + "create_pr_card", ]