Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
ba30c82
Implement asynchronous token retrieval methods in AgenticMsalAuth class
rodrigobr-msft Sep 23, 2025
755e973
get_agentic_user_token implementation
rodrigobr-msft Sep 23, 2025
c79d56b
get_agentic_user_token simplified implementation with ConfidentialCli…
rodrigobr-msft Sep 23, 2025
c671841
Enhance AgenticMsalAuth: update get_agentic_instance_token to return …
rodrigobr-msft Sep 23, 2025
b06af40
Supporting authorization variants
rodrigobr-msft Sep 24, 2025
4ce735e
Continued auth refactor
rodrigobr-msft Sep 25, 2025
ac48fbc
Addressing continuation activity
rodrigobr-msft Sep 25, 2025
a078bd6
Adding authorization tests
rodrigobr-msft Sep 25, 2025
860fade
Passing Authorization tests
rodrigobr-msft Sep 25, 2025
56d46b8
Basic AgenticAuthorization tests
rodrigobr-msft Sep 25, 2025
e514ea8
Added AgenticAuthorization tests
rodrigobr-msft Sep 25, 2025
83c6688
Finalized fundamental unit tests for agentic auth scenarios
rodrigobr-msft Sep 25, 2025
2656435
Formatting
rodrigobr-msft Sep 26, 2025
ef01438
Formatting
rodrigobr-msft Sep 26, 2025
185acfc
Address review comments and more breaking changes
rodrigobr-msft Sep 26, 2025
770cf69
Shifting around exchange token logic
rodrigobr-msft Sep 27, 2025
389b6cd
Adding dynamic loading of connection and related tests
rodrigobr-msft Sep 27, 2025
8a41020
Aligning authorization handlers with how .NET does it
rodrigobr-msft Sep 27, 2025
a0df177
get_token_provider implemented and tested
rodrigobr-msft Sep 29, 2025
ea54c51
Tested UserAuthorization and AgenticUserAuthorization classes
rodrigobr-msft Sep 29, 2025
84edf3a
Finalized refactor tests
rodrigobr-msft Sep 29, 2025
82e4ae5
Sample compat
rodrigobr-msft Sep 29, 2025
6611ed8
Compat changes
rodrigobr-msft Sep 29, 2025
98436a8
Passing all tests again
rodrigobr-msft Sep 30, 2025
c3c3db3
Repurposing SignInState
rodrigobr-msft Sep 30, 2025
3d7a962
Completed tests for auth fix
rodrigobr-msft Sep 30, 2025
a34cdfb
Changes to avoid auth on typing
rodrigobr-msft Sep 30, 2025
3100fc0
Changes to avoid auth on typing
rodrigobr-msft Sep 30, 2025
04eaf7e
Enable passing TurnContext into create_connector_client
rodrigobr-msft Sep 30, 2025
86dfac2
Tweaks to work almost end-to-end / fixing connector client construction
rodrigobr-msft Oct 1, 2025
b564193
Moving agentic static methods to be instance methods of Activity and …
rodrigobr-msft Oct 1, 2025
f791827
Addressing PR review comments
rodrigobr-msft Oct 1, 2025
a82c1f7
Reformatting files with black
rodrigobr-msft Oct 1, 2025
511b9a8
Merge branch 'main' of https://github.com/microsoft/Agents-for-python…
rodrigobr-msft Oct 1, 2025
fe197cb
Fixing test case
rodrigobr-msft Oct 1, 2025
1f2ce31
Removing unneeded subchannel constants
rodrigobr-msft Oct 2, 2025
8883dd4
Logging blueprint id only if debugging
rodrigobr-msft Oct 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Any, Dict
from typing import Any


def load_configuration_from_env(env_vars: Dict[str, Any]) -> dict:
def load_configuration_from_env(env_vars: dict[str, Any]) -> dict:
"""
Parses environment variables and returns a dictionary with the relevant configuration.
"""
Expand All @@ -18,6 +18,11 @@ def load_configuration_from_env(env_vars: Dict[str, Any]) -> dict:
current_level = current_level[next_level]
last_level[levels[-1]] = value

if result.get("CONNECTIONSMAP") and isinstance(result["CONNECTIONSMAP"], dict):
result["CONNECTIONSMAP"] = [
conn for conn in result.get("CONNECTIONSMAP", {}).values()
]

return {
"AGENTAPPLICATION": result.get("AGENTAPPLICATION", {}),
"CONNECTIONS": result.get("CONNECTIONS", {}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from .text_highlight import TextHighlight
from .semantic_action import SemanticAction
from .agents_model import AgentsModel
from .role_types import RoleTypes
from ._model_utils import pick_model, SkipNone
from ._type_aliases import NonEmptyString

Expand Down Expand Up @@ -648,3 +649,21 @@ def add_ai_metadata(
self.entities = []

self.entities.append(ai_entity)

def is_agentic_request(self) -> bool:
return self.recipient and self.recipient.role in [
RoleTypes.agentic_identity,
RoleTypes.agentic_user,
]

def get_agentic_instance_id(self) -> Optional[str]:
"""Gets the agent instance ID from the context if it's an agentic request."""
if not self.is_agentic_request() or not self.recipient:
return None
return self.recipient.agentic_app_id

def get_agentic_user(self) -> Optional[str]:
"""Gets the agentic user (UPN) from the context if it's an agentic request."""
if not self.is_agentic_request() or not self.recipient:
return None
return self.recipient.id
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ class ChannelAccount(AgentsModel):
name: str = None
aad_object_id: NonEmptyString = None
role: NonEmptyString = None
agentic_user_id: NonEmptyString = None
agentic_app_id: NonEmptyString = None
tenant_id: NonEmptyString = None

@property
def properties(self) -> dict[str, Any]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ class Channels(str, Enum):
Ids of channels supported by ABS.
"""

"""Agents channel."""
agents = "agents"

console = "console"
"""Console channel."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ class RoleTypes(str, Enum):
user = "user"
agent = "bot"
skill = "skill"
agentic_identity = "agenticAppInstance"
agentic_user = "agenticUser"
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

import jwt

from .agents_model import AgentsModel
from ._type_aliases import NonEmptyString

Expand All @@ -26,3 +28,19 @@ class TokenResponse(AgentsModel):

def __bool__(self):
return bool(self.token)

def is_exchangeable(self) -> bool:
"""
Checks if a token is exchangeable (has api:// audience).

:param token: The token to check.
:type token: str
:return: True if the token is exchangeable, False otherwise.
"""
try:
# Decode without verification to check the audience
payload = jwt.decode(self.token, options={"verify_signature": False})
aud = payload.get("aud")
return isinstance(aud, str) and aud.startswith("api://")
except Exception:
return False
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import jwt
from typing import Optional
from urllib.parse import urlparse, ParseResult as URI
from msal import (
Expand All @@ -23,6 +24,18 @@
logger = logging.getLogger(__name__)


# this is deferred because jwt.decode is expensive and we don't want to do it unless we
# have logging.DEBUG enabled
class _DeferredLogOfBlueprintId:
def __init__(self, jwt_token: str):
self.jwt_token = jwt_token

def __str__(self):
payload = jwt.decode(self.jwt_token, options={"verify_signature": False})
agentic_blueprint_id = payload.get("xms_par_app_azp")
return f"Agentic blueprint id: {agentic_blueprint_id}"


class MsalAuth(AccessTokenProviderBase):

_client_credential_cache = None
Expand Down Expand Up @@ -56,11 +69,16 @@ async def get_access_token(
auth_result_payload = msal_auth_client.acquire_token_for_client(
scopes=local_scopes
)
else:
auth_result_payload = None

# TODO: Handling token error / acquisition failed
return auth_result_payload["access_token"]
res = auth_result_payload.get("access_token") if auth_result_payload else None
if not res:
logger.error("Failed to acquire token for resource %s", auth_result_payload)
raise ValueError(f"Failed to acquire token. {str(auth_result_payload)}")
return res

async def aquire_token_on_behalf_of(
async def acquire_token_on_behalf_of(
self, scopes: list[str], user_assertion: str
) -> str:
"""
Expand Down Expand Up @@ -186,3 +204,189 @@ def _resolve_scopes_list(self, instance_url: URI, scopes=None) -> list[str]:
temp_list.append(scope_placeholder)
logger.debug(f"Resolved scopes: {temp_list}")
return temp_list

# the call to MSAL is blocking, but in the future we want to create an asyncio task
# to avoid this
async def get_agentic_application_token(
self, agent_app_instance_id: str
) -> Optional[str]:
"""Gets the agentic application token for the given agent application instance ID.

:param agent_app_instance_id: The agent application instance ID.
:type agent_app_instance_id: str
:return: The agentic application token, or None if not found.
:rtype: Optional[str]
"""

if not agent_app_instance_id:
raise ValueError("Agent application instance Id must be provided.")

logger.info(
"Attempting to get agentic application token from agent_app_instance_id %s",
agent_app_instance_id,
)
msal_auth_client = self._create_client_application()

if isinstance(msal_auth_client, ConfidentialClientApplication):

# https://github.dev/AzureAD/microsoft-authentication-library-for-dotnet
auth_result_payload = msal_auth_client.acquire_token_for_client(
["api://AzureAdTokenExchange/.default"],
data={"fmi_path": agent_app_instance_id},
)

if auth_result_payload:
return auth_result_payload.get("access_token")

return None

async def get_agentic_instance_token(
self, agent_app_instance_id: str
) -> tuple[str, str]:
"""Gets the agentic instance token for the given agent application instance ID.

:param agent_app_instance_id: The agent application instance ID.
:type agent_app_instance_id: str
:return: A tuple containing the agentic instance token and the agent application token.
:rtype: tuple[str, str]
"""

if not agent_app_instance_id:
raise ValueError("Agent application instance Id must be provided.")

logger.info(
"Attempting to get agentic instance token from agent_app_instance_id %s",
agent_app_instance_id,
)
agent_token_result = await self.get_agentic_application_token(
agent_app_instance_id
)

if not agent_token_result:
logger.error(
"Failed to acquire agentic instance token or agent token for agent_app_instance_id %s",
agent_app_instance_id,
)
raise Exception(
f"Failed to acquire agentic instance token or agent token for agent_app_instance_id {agent_app_instance_id}"
)

authority = (
f"https://login.microsoftonline.com/{self._msal_configuration.TENANT_ID}"
)

instance_app = ConfidentialClientApplication(
client_id=agent_app_instance_id,
authority=authority,
client_credential={"client_assertion": agent_token_result},
)

agentic_instance_token = instance_app.acquire_token_for_client(
["api://AzureAdTokenExchange/.default"]
)

if not agentic_instance_token:
logger.error(
"Failed to acquire agentic instance token or agent token for agent_app_instance_id %s",
agent_app_instance_id,
)
raise Exception(
f"Failed to acquire agentic instance token or agent token for agent_app_instance_id {agent_app_instance_id}"
)

# future scenario where we don't know the blueprint id upfront

token = agentic_instance_token.get("access_token")
if not token:
logger.error(
"Failed to acquire agentic instance token, %s", agentic_instance_token
)
raise ValueError(f"Failed to acquire token. {str(agentic_instance_token)}")

logger.debug(_DeferredLogOfBlueprintId(token))

return agentic_instance_token["access_token"], agent_token_result

async def get_agentic_user_token(
self, agent_app_instance_id: str, upn: str, scopes: list[str]
) -> Optional[str]:
"""Gets the agentic user token for the given agent application instance ID and user principal name and the scopes.

:param agent_app_instance_id: The agent application instance ID.
:type agent_app_instance_id: str
:param upn: The user principal name.
:type upn: str
:param scopes: The scopes to request for the token.
:type scopes: list[str]
:return: The agentic user token, or None if not found.
:rtype: Optional[str]
"""
if not agent_app_instance_id or not upn:
raise ValueError(
"Agent application instance Id and user principal name must be provided."
)

logger.info(
"Attempting to get agentic user token from agent_app_instance_id %s and upn %s",
agent_app_instance_id,
upn,
)
instance_token, agent_token = await self.get_agentic_instance_token(
agent_app_instance_id
)

if not instance_token or not agent_token:
logger.error(
"Failed to acquire instance token or agent token for agent_app_instance_id %s and upn %s",
agent_app_instance_id,
upn,
)
raise Exception(
f"Failed to acquire instance token or agent token for agent_app_instance_id {agent_app_instance_id} and upn {upn}"
)

authority = (
f"https://login.microsoftonline.com/{self._msal_configuration.TENANT_ID}"
)

instance_app = ConfidentialClientApplication(
client_id=agent_app_instance_id,
authority=authority,
client_credential={"client_assertion": agent_token},
)

logger.info(
"Acquiring agentic user token for agent_app_instance_id %s and upn %s",
agent_app_instance_id,
upn,
)
auth_result_payload = instance_app.acquire_token_for_client(
scopes,
data={
"username": upn,
"user_federated_identity_credential": instance_token,
"grant_type": "user_fic",
},
)

if not auth_result_payload:
logger.error(
"Failed to acquire agentic user token for agent_app_instance_id %s and upn %s, %s",
agent_app_instance_id,
upn,
auth_result_payload,
)
return None

access_token = auth_result_payload.get("access_token")
if not access_token:
logger.error(
"Failed to acquire agentic user token for agent_app_instance_id %s and upn %s, %s",
agent_app_instance_id,
upn,
auth_result_payload,
)
return None

logger.info("Acquired agentic user token response.")
return access_token
Loading