diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/channel_service_adapter.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/channel_service_adapter.py index 244ebcdb..dc27a76f 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/channel_service_adapter.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/channel_service_adapter.py @@ -213,21 +213,21 @@ async def create_conversation( # pylint: disable=arguments-differ claims_identity = self.create_claims_identity(agent_app_id) claims_identity.claims[AuthenticationConstants.SERVICE_URL_CLAIM] = service_url - # Create a UserTokenClient instance for the application to use. (For example, in the OAuthPrompt.) - user_token_client: UserTokenClient = ( - await self._channel_service_client_factory.create_user_token_client( - claims_identity - ) - ) - # Create a turn context and run the pipeline. context = self._create_turn_context( claims_identity, None, - user_token_client, callback, ) + # Create a UserTokenClient instance for the application to use. (For example, in the OAuthPrompt.) + user_token_client: UserTokenClient = ( + await self._channel_service_client_factory.create_user_token_client( + context, claims_identity + ) + ) + context.turn_state[self.USER_TOKEN_CLIENT_KEY] = user_token_client + # Create the connector client to use for outbound requests. connector_client: ConnectorClient = ( await self._channel_service_client_factory.create_connector_client( @@ -264,22 +264,21 @@ async def process_proactive( callback: Callable[[TurnContext], Awaitable], ): - # Create a UserTokenClient instance for the application to use. (For example, in the OAuthPrompt.) - user_token_client: UserTokenClient = ( - await self._channel_service_client_factory.create_user_token_client( - claims_identity - ) - ) - # Create a turn context and run the pipeline. context = self._create_turn_context( claims_identity, audience, - user_token_client, callback, activity=continuation_activity, ) + user_token_client: UserTokenClient = ( + await self._channel_service_client_factory.create_user_token_client( + context, claims_identity + ) + ) + context.turn_state[self.USER_TOKEN_CLIENT_KEY] = user_token_client + # Create the connector client to use for outbound requests. connector_client: ConnectorClient = ( await self._channel_service_client_factory.create_connector_client( @@ -338,22 +337,22 @@ async def process_activity( ): use_anonymous_auth_callback = True - # Create a UserTokenClient instance for the OAuth flow. - user_token_client: UserTokenClient = ( - await self._channel_service_client_factory.create_user_token_client( - claims_identity, use_anonymous_auth_callback - ) - ) - # Create a turn context and run the pipeline. context = self._create_turn_context( claims_identity, outgoing_audience, - user_token_client, callback, activity=activity, ) + # Create a UserTokenClient instance for the OAuth flow. + user_token_client: UserTokenClient = ( + await self._channel_service_client_factory.create_user_token_client( + context, claims_identity, use_anonymous_auth_callback + ) + ) + context.turn_state[self.USER_TOKEN_CLIENT_KEY] = user_token_client + # Create the connector client to use for outbound requests. connector_client: ConnectorClient = ( await self._channel_service_client_factory.create_connector_client( @@ -425,14 +424,12 @@ def _create_turn_context( self, claims_identity: ClaimsIdentity, oauth_scope: str, - user_token_client: UserTokenClientBase, callback: Callable[[TurnContext], Awaitable], activity: Optional[Activity] = None, ) -> TurnContext: context = TurnContext(self, activity, claims_identity) context.turn_state[self.AGENT_IDENTITY_KEY] = claims_identity - context.turn_state[self.USER_TOKEN_CLIENT_KEY] = user_token_client context.turn_state[self.AGENT_CALLBACK_HANDLER_KEY] = callback context.turn_state[self.CHANNEL_SERVICE_FACTORY_KEY] = ( self._channel_service_client_factory diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/channel_service_client_factory_base.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/channel_service_client_factory_base.py index d311781d..e325653c 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/channel_service_client_factory_base.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/channel_service_client_factory_base.py @@ -6,12 +6,14 @@ ConnectorClientBase, UserTokenClientBase, ) +from microsoft_agents.hosting.core.turn_context import TurnContext class ChannelServiceClientFactoryBase(Protocol): @abstractmethod async def create_connector_client( self, + context: TurnContext, claims_identity: ClaimsIdentity, service_url: str, audience: str, @@ -32,7 +34,10 @@ async def create_connector_client( @abstractmethod async def create_user_token_client( - self, claims_identity: ClaimsIdentity, use_anonymous: bool = False + self, + context: TurnContext, + claims_identity: ClaimsIdentity, + use_anonymous: bool = False, ) -> UserTokenClientBase: """ Creates the appropriate UserTokenClientBase instance. diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/connector/client/connector_client.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/connector/client/connector_client.py index f713fadf..8156ea58 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/connector/client/connector_client.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/connector/client/connector_client.py @@ -122,8 +122,12 @@ async def get_attachment(self, attachment_id: str, view_id: str) -> BytesIO: class ConversationsOperations(ConversationsBase): - def __init__(self, client: ClientSession): + def __init__(self, client: ClientSession, **kwargs): self.client = client + self._max_conversation_id_length = kwargs.get("max_conversation_id_length", 200) + + def _normalize_conversation_id(self, conversation_id: str) -> str: + return conversation_id[: self._max_conversation_id_length] async def get_conversations( self, continuation_token: Optional[str] = None @@ -193,11 +197,16 @@ async def reply_to_activity( ) raise ValueError("conversationId and activityId are required") + print("\n*3") + print(conversation_id) + print("\n*3") + conversation_id = self._normalize_conversation_id(conversation_id) url = f"v3/conversations/{conversation_id}/activities/{activity_id}" logger.info( f"Replying to activity: {activity_id} in conversation: {conversation_id}. Activity type is {body.type}" ) + async with self.client.post( url, json=body.model_dump( @@ -216,7 +225,8 @@ async def reply_to_activity( logger.info( f"Reply to conversation/activity: {result.get('id')}, {activity_id}" ) - return ResourceResponse.model_validate(result) + + return ResourceResponse.model_validate(result) async def send_to_conversation( self, conversation_id: str, body: Activity @@ -235,6 +245,7 @@ async def send_to_conversation( ) raise ValueError("conversationId is required") + conversation_id = self._normalize_conversation_id(conversation_id) url = f"v3/conversations/{conversation_id}/activities" logger.info( @@ -271,6 +282,7 @@ async def update_activity( ) raise ValueError("conversationId and activityId are required") + conversation_id = self._normalize_conversation_id(conversation_id) url = f"v3/conversations/{conversation_id}/activities/{activity_id}" logger.info( @@ -303,6 +315,7 @@ async def delete_activity(self, conversation_id: str, activity_id: str) -> None: ) raise ValueError("conversationId and activityId are required") + conversation_id = self._normalize_conversation_id(conversation_id) url = f"v3/conversations/{conversation_id}/activities/{activity_id}" logger.info( @@ -332,6 +345,7 @@ async def upload_attachment( ) raise ValueError("conversationId is required") + conversation_id = self._normalize_conversation_id(conversation_id) url = f"v3/conversations/{conversation_id}/attachments" # Convert the AttachmentData to a dictionary @@ -371,6 +385,7 @@ async def get_conversation_members( ) raise ValueError("conversationId is required") + conversation_id = self._normalize_conversation_id(conversation_id) url = f"v3/conversations/{conversation_id}/members" logger.info(f"Getting conversation members for conversation: {conversation_id}") @@ -402,6 +417,7 @@ async def get_conversation_member( ) raise ValueError("conversationId and memberId are required") + conversation_id = self._normalize_conversation_id(conversation_id) url = f"v3/conversations/{conversation_id}/members/{member_id}" logger.info( @@ -434,6 +450,7 @@ async def delete_conversation_member( ) raise ValueError("conversationId and memberId are required") + conversation_id = self._normalize_conversation_id(conversation_id) url = f"v3/conversations/{conversation_id}/members/{member_id}" logger.info( @@ -464,6 +481,7 @@ async def get_activity_members( ) raise ValueError("conversationId and activityId are required") + conversation_id = self._normalize_conversation_id(conversation_id) url = f"v3/conversations/{conversation_id}/activities/{activity_id}/members" logger.info( @@ -507,6 +525,7 @@ async def get_conversation_paged_members( if continuation_token is not None: params["continuationToken"] = continuation_token + conversation_id = self._normalize_conversation_id(conversation_id) url = f"v3/conversations/{conversation_id}/pagedmembers" logger.info( @@ -540,6 +559,7 @@ async def send_conversation_history( ) raise ValueError("conversationId is required") + conversation_id = self._normalize_conversation_id(conversation_id) url = f"v3/conversations/{conversation_id}/activities/history" logger.info(f"Sending conversation history to conversation: {conversation_id}") diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/rest_channel_service_client_factory.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/rest_channel_service_client_factory.py index 7c444d89..10aecc70 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/rest_channel_service_client_factory.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/rest_channel_service_client_factory.py @@ -1,4 +1,3 @@ -import re from typing import Optional import logging @@ -33,6 +32,52 @@ def __init__( self._token_service_endpoint = token_service_endpoint self._token_service_audience = token_service_audience + async def _get_agentic_token(self, context: TurnContext, service_url: str) -> str: + logger.info( + "Creating connector client for agentic request to service_url: %s", + service_url, + ) + + if not context.identity: + raise ValueError("context.identity is required for agentic activities") + + connection = self._connection_manager.get_token_provider( + context.identity, service_url + ) + if not hasattr(connection, "_msal_configuration"): + raise TypeError( + "Connection does not support MSAL configuration for agentic token retrieval" + ) + + if connection._msal_configuration.ALT_BLUEPRINT_ID: + logger.debug( + "Using alternative blueprint ID for agentic token retrieval: %s", + connection._msal_configuration.ALT_BLUEPRINT_ID, + ) + connection = self._connection_manager.get_connection( + connection._msal_configuration.ALT_BLUEPRINT_ID + ) + + agent_instance_id = context.activity.get_agentic_instance_id() + if not agent_instance_id: + raise ValueError("Agent instance ID is required for agentic identity role") + + if context.activity.recipient.role == RoleTypes.agentic_identity: + token, _ = await connection.get_agentic_instance_token(agent_instance_id) + else: + agentic_user = context.activity.get_agentic_user() + if not agentic_user: + raise ValueError("Agentic user is required for agentic user role") + token = await connection.get_agentic_user_token( + agent_instance_id, + agentic_user, + [AuthenticationConstants.APX_PRODUCTION_SCOPE], + ) + + if not token: + raise ValueError("Failed to obtain token for agentic activity") + return token + async def create_connector_client( self, context: TurnContext, @@ -42,6 +87,8 @@ async def create_connector_client( scopes: Optional[list[str]] = None, use_anonymous: bool = False, ) -> ConnectorClientBase: + if not context or not claims_identity: + raise TypeError("context and claims_identity are required") if not service_url: raise TypeError( "RestChannelServiceClientFactory.create_connector_client: service_url can't be None or Empty" @@ -52,50 +99,7 @@ async def create_connector_client( ) if context.activity.is_agentic_request(): - logger.info( - "Creating connector client for agentic request to service_url: %s", - service_url, - ) - - if not context.identity: - raise ValueError("context.identity is required for agentic activities") - - connection = self._connection_manager.get_token_provider( - context.identity, service_url - ) - - # TODO: clean up linter - if connection._msal_configuration.ALT_BLUEPRINT_ID: - logger.debug( - "Using alternative blueprint ID for agentic token retrieval: %s", - connection._msal_configuration.ALT_BLUEPRINT_ID, - ) - connection = self._connection_manager.get_connection( - connection._msal_configuration.ALT_BLUEPRINT_ID - ) - - agent_instance_id = context.activity.get_agentic_instance_id() - if not agent_instance_id: - raise ValueError( - "Agent instance ID is required for agentic identity role" - ) - - if context.activity.recipient.role == RoleTypes.agentic_identity: - token, _ = await connection.get_agentic_instance_token( - agent_instance_id - ) - else: - agentic_user = context.activity.get_agentic_user() - if not agentic_user: - raise ValueError("Agentic user is required for agentic user role") - token = await connection.get_agentic_user_token( - agent_instance_id, - agentic_user, - [AuthenticationConstants.APX_PRODUCTION_SCOPE], - ) - - if not token: - raise ValueError("Failed to obtain token for agentic activity") + token = await self._get_agentic_token(context, service_url) else: token_provider: AccessTokenProviderBase = ( self._connection_manager.get_token_provider( @@ -115,18 +119,40 @@ async def create_connector_client( ) async def create_user_token_client( - self, claims_identity: ClaimsIdentity, use_anonymous: bool = False + self, + context: TurnContext, + claims_identity: ClaimsIdentity, + use_anonymous: bool = False, ) -> UserTokenClient: + """Create a UserTokenClient for the given context and claims identity. + + :param context: The TurnContext for the current turn of conversation. + :param claims_identity: The ClaimsIdentity of the user. + :param use_anonymous: Whether to use an anonymous token provider. + """ + if not context or not claims_identity: + raise ValueError("context and claims_identity are required") + if use_anonymous: return UserTokenClient(endpoint=self._token_service_endpoint, token="") - token_provider = self._connection_manager.get_token_provider( - claims_identity, self._token_service_endpoint - ) + if context.activity.is_agentic_request(): + token = await self._get_agentic_token(context, self._token_service_endpoint) + else: + scopes = [f"{self._token_service_audience}/.default"] + + token_provider = self._connection_manager.get_token_provider( + claims_identity, self._token_service_endpoint + ) + + token = await token_provider.get_access_token( + self._token_service_audience, scopes + ) + + if not token: + logger.error("Failed to obtain token for user token client") + raise ValueError("Failed to obtain token for user token client") - token = await token_provider.get_access_token( - self._token_service_audience, [f"{self._token_service_audience}/.default"] - ) return UserTokenClient( endpoint=self._token_service_endpoint, token=token, diff --git a/test_samples/agentic-test/env.TEMPLATE b/test_samples/agentic-test/env.TEMPLATE new file mode 100644 index 00000000..5ed5fa5e --- /dev/null +++ b/test_samples/agentic-test/env.TEMPLATE @@ -0,0 +1,8 @@ +CONNECTIONS__SERVICE_CONNECTION__SETTINGS__CLIENTID= +CONNECTIONS__SERVICE_CONNECTION__SETTINGS__CLIENTSECRET= +CONNECTIONS__SERVICE_CONNECTION__SETTINGS__TENANTID= + +AGENTAPPLICATION__USERAUTHORIZATION__HANDLERS__AGENTIC__SETTINGS__TYPE=AgenticUserAuthorization + +CONNECTIONSMAP_0_SERVICEURL=* +CONNECTIONSMAP_0_CONNECTION=SERVICE_CONNECTION diff --git a/test_samples/agentic-test/requirements.txt b/test_samples/agentic-test/requirements.txt new file mode 100644 index 00000000..2510b84c --- /dev/null +++ b/test_samples/agentic-test/requirements.txt @@ -0,0 +1,6 @@ +python-dotenv +aiohttp +microsoft-agents-hosting-aiohttp +microsoft-agents-hosting-core +microsoft-agents-authentication-msal +microsoft-agents-activity \ No newline at end of file diff --git a/test_samples/agentic-test/src/agent.py b/test_samples/agentic-test/src/agent.py new file mode 100644 index 00000000..efbff47f --- /dev/null +++ b/test_samples/agentic-test/src/agent.py @@ -0,0 +1,53 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import logging +from dotenv import load_dotenv + +from os import environ +from microsoft_agents.hosting.aiohttp import CloudAdapter +from microsoft_agents.hosting.core import ( + Authorization, + AgentApplication, + TurnState, + TurnContext, + MemoryStorage, +) +from microsoft_agents.hosting.core.storage import ( + TranscriptLoggerMiddleware, + ConsoleTranscriptLogger, +) +from microsoft_agents.authentication.msal import MsalConnectionManager +from microsoft_agents.activity import load_configuration_from_env + +logger = logging.getLogger(__name__) + +load_dotenv() # robrandao: todo +agents_sdk_config = load_configuration_from_env(environ) + +STORAGE = MemoryStorage() +CONNECTION_MANAGER = MsalConnectionManager(**agents_sdk_config) +ADAPTER = CloudAdapter(connection_manager=CONNECTION_MANAGER) +ADAPTER.use(TranscriptLoggerMiddleware(ConsoleTranscriptLogger())) +AUTHORIZATION = Authorization(STORAGE, CONNECTION_MANAGER, **agents_sdk_config) + +# robrandao: downloader? +AGENT_APP = AgentApplication[TurnState]( + storage=STORAGE, adapter=ADAPTER, authorization=AUTHORIZATION, **agents_sdk_config +) + + +@AGENT_APP.activity("message", auth_handlers=["AGENTIC"]) +async def on_message(context: TurnContext, _state: TurnState): + aau_token = await AGENT_APP.auth.get_token(context, "AGENTIC") + await context.send_activity( + f"Acquired agentic user token with length: {len(aau_token.token)}" + ) + + +@AGENT_APP.error +async def on_error(context: TurnContext, error: Exception): + logger.error("[on_turn_error] unhandled error: %s", error) + + # Send a message to the user + await context.send_activity("The agent encountered an error.") diff --git a/test_samples/agentic-test/src/main.py b/test_samples/agentic-test/src/main.py new file mode 100644 index 00000000..085b3385 --- /dev/null +++ b/test_samples/agentic-test/src/main.py @@ -0,0 +1,25 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +# enable logging for Microsoft Agents library +# for more information, see README.md for Quickstart Agent +import logging + +ms_agents_logger = logging.getLogger("microsoft_agents") +console_handler = logging.StreamHandler() +console_handler.setFormatter( + logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s (%(filename)s:%(lineno)d)" + ) +) +ms_agents_logger.addHandler(console_handler) +ms_agents_logger.setLevel(logging.DEBUG) + + +from .agent import AGENT_APP, CONNECTION_MANAGER +from .start_server import start_server + +start_server( + agent_application=AGENT_APP, + auth_configuration=CONNECTION_MANAGER.get_default_connection_configuration(), +) diff --git a/test_samples/agentic-test/src/start_server.py b/test_samples/agentic-test/src/start_server.py new file mode 100644 index 00000000..d76b619e --- /dev/null +++ b/test_samples/agentic-test/src/start_server.py @@ -0,0 +1,32 @@ +from os import environ +from microsoft_agents.hosting.core import AgentApplication, AgentAuthConfiguration +from microsoft_agents.hosting.aiohttp import ( + start_agent_process, + jwt_authorization_middleware, + CloudAdapter, +) +from aiohttp.web import Request, Response, Application, run_app + + +def start_server( + agent_application: AgentApplication, auth_configuration: AgentAuthConfiguration +): + async def entry_point(req: Request) -> Response: + agent: AgentApplication = req.app["agent_app"] + adapter: CloudAdapter = req.app["adapter"] + return await start_agent_process( + req, + agent, + adapter, + ) + + APP = Application(middlewares=[jwt_authorization_middleware]) + APP.router.add_post("/api/messages", entry_point) + APP["agent_configuration"] = auth_configuration + APP["agent_app"] = agent_application + APP["adapter"] = agent_application.adapter + + try: + run_app(APP, host="localhost", port=environ.get("PORT", 3978)) + except Exception as error: + raise error