Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,6 @@ cython_debug/

# vscode
.vscode/

# Binary files
bin/
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from .bot_type import BotType
from .connection_settings import ConnectionSettings
from .copilot_client import CopilotClient
from .direct_to_engine_connection_settings_protocol import (
DirectToEngineConnectionSettingsProtocol,
)
from .execute_turn_request import ExecuteTurnRequest
from .power_platform_cloud import PowerPlatformCloud
from .power_platform_environment import PowerPlatformEnvironment

__all__ = [
"BotType",
"ConnectionSettings",
"CopilotClient",
"DirectToEngineConnectionSettingsProtocol",
"ExecuteTurnRequest",
"PowerPlatformCloud",
"PowerPlatformEnvironment",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from enum import Enum


class BotType(str, Enum):
PUBLISHED = "published"
PREBUILT = "prebuilt"
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from typing import Optional
from .direct_to_engine_connection_settings_protocol import (
DirectToEngineConnectionSettingsProtocol,
)
from .power_platform_cloud import PowerPlatformCloud
from .bot_type import BotType


class ConnectionSettings(DirectToEngineConnectionSettingsProtocol):
"""
Connection settings for the DirectToEngineConnectionConfiguration.
"""

def __init__(
self,
environment_id: str,
bot_identifier: str,
cloud: Optional[PowerPlatformCloud],
copilot_bot_type: Optional[BotType],
custom_power_platform_cloud: Optional[str],
) -> None:
self.environment_id = environment_id
self.bot_identifier = bot_identifier

if not self.environment_id:
raise ValueError("Environment ID must be provided")
if not self.bot_identifier:
raise ValueError("Bot Identifier must be provided")

self.cloud = cloud or PowerPlatformCloud.UNKNOWN
self.copilot_bot_type = copilot_bot_type or BotType.PUBLISHED
self.custom_power_platform_cloud = custom_power_platform_cloud
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import aiohttp
from typing import AsyncIterable, Callable, Optional

from microsoft.agents.core.models import Activity, ActivityTypes, ConversationAccount

from .connection_settings import ConnectionSettings
from .execute_turn_request import ExecuteTurnRequest
from .power_platform_environment import PowerPlatformEnvironment


class CopilotClient:
EVENT_STREAM_TYPE = "text/event-stream"
APPLICATION_JSON_TYPE = "application/json"

_current_conversation_id = ""

def __init__(
self,
settings: ConnectionSettings,
token: str,
):
self.settings = settings
self._token = token
# TODO: Add logger
# self.logger = logger
self.conversation_id = ""

async def post_request(
self, url: str, data: dict, headers: dict
) -> AsyncIterable[Activity]:
async with aiohttp.ClientSession() as session:
async with session.post(url, json=data, headers=headers) as response:
if response.status != 200:
# self.logger(f"Error sending request: {response.status}")
raise aiohttp.ClientError(
f"Error sending request: {response.status}"
)
event_type = None
async for line in response.content:
if line.startswith(b"event:"):
event_type = line[6:].decode("utf-8").strip()
if line.startswith(b"data:") and event_type == "activity":
activity_data = line[5:].decode("utf-8").strip()
activity = Activity.model_validate_json(activity_data)

if activity.type == ActivityTypes.message:
self._current_conversation_id = activity.conversation.id

yield activity

async def start_conversation(
self, emit_start_conversation_event: bool = True
) -> AsyncIterable[Activity]:
url = PowerPlatformEnvironment.get_copilot_studio_connection_url(
settings=self.settings
)
data = {"emitStartConversationEvent": emit_start_conversation_event}
headers = {
"Content-Type": self.APPLICATION_JSON_TYPE,
"Authorization": f"Bearer {self._token}",
"Accept": self.EVENT_STREAM_TYPE,
}

async for activity in self.post_request(url, data, headers):
yield activity

async def ask_question(
self, question: str, conversation_id: Optional[str] = None
) -> AsyncIterable[Activity]:
activity = Activity(
type="message",
text=question,
conversation=ConversationAccount(
id=conversation_id or self._current_conversation_id
),
)

async for activity in self.ask_question_with_activity(activity):
yield activity

async def ask_question_with_activity(
self, activity: Activity
) -> AsyncIterable[Activity]:
if not activity:
raise ValueError(
"CopilotClient.ask_question_with_activity: Activity cannot be None"
)

local_conversation_id = (
activity.conversation.id or self._current_conversation_id
)

url = PowerPlatformEnvironment.get_copilot_studio_connection_url(
settings=self.settings, conversation_id=local_conversation_id
)
data = ExecuteTurnRequest(activity=activity).model_dump(
mode="json", by_alias=True, exclude_unset=True
)
headers = {
"Content-Type": self.APPLICATION_JSON_TYPE,
"Authorization": f"Bearer {self._token}",
"Accept": self.EVENT_STREAM_TYPE,
}

async for activity in self.post_request(url, data, headers):
yield activity
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from typing import Protocol, Optional

from .bot_type import BotType
from .power_platform_cloud import PowerPlatformCloud


class DirectToEngineConnectionSettingsProtocol(Protocol):
"""
Protocol for DirectToEngineConnectionConfiguration.
"""

# Schema name for the Copilot Studio Hosted Copilot.
bot_identifier: Optional[str]

# if PowerPlatformCloud is set to Other, this is the url for the power platform API endpoint.
custom_power_platform_cloud: Optional[str]

# Environment ID for the environment that hosts the bot
environment_id: Optional[str]

# Power Platform Cloud where the environment is hosted
cloud: Optional[PowerPlatformCloud]

# Type of Bot hosted in Copilot Studio
copilot_bot_type: Optional[BotType]
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from microsoft.agents.core.models import AgentsModel, Activity


class ExecuteTurnRequest(AgentsModel):

activity: Activity
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from enum import Enum


class PowerPlatformCloud(str, Enum):
"""
Enum representing different Power Platform Clouds.
"""

UNKNOWN = "Unknown"
EXP = "Exp"
DEV = "Dev"
TEST = "Test"
PREPROD = "Preprod"
FIRST_RELEASE = "FirstRelease"
PROD = "Prod"
GOV = "Gov"
HIGH = "High"
DOD = "DoD"
MOONCAKE = "Mooncake"
EX = "Ex"
RX = "Rx"
PRV = "Prv"
LOCAL = "Local"
GOV_FR = "GovFR"
OTHER = "Other"
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
from urllib.parse import urlparse, urlunparse
from typing import Optional
from .connection_settings import ConnectionSettings
from .bot_type import BotType
from .power_platform_cloud import PowerPlatformCloud


# TODO: POC provides the
class PowerPlatformEnvironment:
"""
Class representing the Power Platform Environment.
"""

API_VERSION = "2022-03-01-preview"

@staticmethod
def get_copilot_studio_connection_url(
settings: ConnectionSettings,
conversation_id: Optional[str] = None,
bot_type: BotType = BotType.PUBLISHED,
cloud: PowerPlatformCloud = PowerPlatformCloud.PROD,
cloud_base_address: Optional[str] = None,
) -> str:
if cloud == PowerPlatformCloud.OTHER and not cloud_base_address:
raise ValueError(
"cloud_base_address must be provided when PowerPlatformCloud is Other"
)
if not settings.environment_id:
raise ValueError("EnvironmentId must be provided")
if not settings.bot_identifier:
raise ValueError("BotIdentifier must be provided")
if settings.cloud and settings.cloud != PowerPlatformCloud.UNKNOWN:
cloud = settings.cloud
if cloud == PowerPlatformCloud.OTHER:
parsed_url = urlparse(cloud_base_address)
is_absolute_url = parsed_url.scheme and parsed_url.netloc
if cloud_base_address and is_absolute_url:
pass
elif settings.custom_power_platform_cloud:
cloud_base_address = settings.custom_power_platform_cloud
else:
raise ValueError(
"Either CustomPowerPlatformCloud or cloud_base_address must be provided when PowerPlatformCloud is Other"
)
if settings.copilot_bot_type:
bot_type = settings.copilot_bot_type

cloud_base_address = cloud_base_address or "api.unknown.powerplatform.com"
host = PowerPlatformEnvironment.get_environment_endpoint(
cloud, settings.environment_id, cloud_base_address
)
return PowerPlatformEnvironment.create_uri(
settings.bot_identifier, host, bot_type, conversation_id
)

@staticmethod
def get_token_audience(
settings: Optional[ConnectionSettings] = None,
cloud: PowerPlatformCloud = PowerPlatformCloud.UNKNOWN,
cloud_base_address: Optional[str] = None,
) -> str:
if cloud == PowerPlatformCloud.OTHER and not cloud_base_address:
raise ValueError(
"cloud_base_address must be provided when PowerPlatformCloud is Other"
)
if not settings and cloud == PowerPlatformCloud.UNKNOWN:
raise ValueError("Either settings or cloud must be provided")
if settings and settings.cloud and settings.cloud != PowerPlatformCloud.UNKNOWN:
cloud = settings.cloud
if cloud == PowerPlatformCloud.OTHER:
if cloud_base_address and urlparse(cloud_base_address).scheme:
cloud = PowerPlatformCloud.OTHER
elif (
settings
and settings.custom_power_platform_cloud
and urlparse(settings.custom_power_platform_cloud).scheme
):
cloud = PowerPlatformCloud.OTHER
cloud_base_address = settings.custom_power_platform_cloud
else:
raise ValueError(
"Either CustomPowerPlatformCloud or cloud_base_address must be provided when PowerPlatformCloud is Other"
)

cloud_base_address = cloud_base_address or "api.unknown.powerplatform.com"
return f"https://{PowerPlatformEnvironment.get_endpoint_suffix(cloud, cloud_base_address)}/.default"

@staticmethod
def create_uri(
bot_identifier: str,
host: str,
bot_type: BotType,
conversation_id: Optional[str],
) -> str:
bot_path_name = (
"dataverse-backed" if bot_type == BotType.PUBLISHED else "prebuilt"
)
path = f"/copilotstudio/{bot_path_name}/authenticated/bots/{bot_identifier}/conversations"
if conversation_id:
path += f"/{conversation_id}"
return urlunparse(
(
"https",
host,
path,
"",
f"api-version={PowerPlatformEnvironment.API_VERSION}",
"",
)
)

@staticmethod
def get_environment_endpoint(
cloud: PowerPlatformCloud,
environment_id: str,
cloud_base_address: Optional[str] = None,
) -> str:
if cloud == PowerPlatformCloud.OTHER and not cloud_base_address:
raise ValueError(
"cloud_base_address must be provided when PowerPlatformCloud is Other"
)
cloud_base_address = cloud_base_address or "api.unknown.powerplatform.com"
normalized_resource_id = environment_id.lower().replace("-", "")
id_suffix_length = PowerPlatformEnvironment.get_id_suffix_length(cloud)
hex_prefix = normalized_resource_id[:-id_suffix_length]
hex_suffix = normalized_resource_id[-id_suffix_length:]
return f"{hex_prefix}.{hex_suffix}.environment.{PowerPlatformEnvironment.get_endpoint_suffix(cloud, cloud_base_address)}"

@staticmethod
def get_endpoint_suffix(cloud: PowerPlatformCloud, cloud_base_address: str) -> str:
return {
PowerPlatformCloud.LOCAL: "api.powerplatform.localhost",
PowerPlatformCloud.EXP: "api.exp.powerplatform.com",
PowerPlatformCloud.DEV: "api.dev.powerplatform.com",
PowerPlatformCloud.PRV: "api.prv.powerplatform.com",
PowerPlatformCloud.TEST: "api.test.powerplatform.com",
PowerPlatformCloud.PREPROD: "api.preprod.powerplatform.com",
PowerPlatformCloud.FIRST_RELEASE: "api.powerplatform.com",
PowerPlatformCloud.PROD: "api.powerplatform.com",
PowerPlatformCloud.GOV_FR: "api.gov.powerplatform.microsoft.us",
PowerPlatformCloud.GOV: "api.gov.powerplatform.microsoft.us",
PowerPlatformCloud.HIGH: "api.high.powerplatform.microsoft.us",
PowerPlatformCloud.DOD: "api.appsplatform.us",
PowerPlatformCloud.MOONCAKE: "api.powerplatform.partner.microsoftonline.cn",
PowerPlatformCloud.EX: "api.powerplatform.eaglex.ic.gov",
PowerPlatformCloud.RX: "api.powerplatform.microsoft.scloud",
PowerPlatformCloud.OTHER: cloud_base_address,
}.get(cloud, ValueError(f"Invalid cloud category value: {cloud}"))

@staticmethod
def get_id_suffix_length(cloud: PowerPlatformCloud) -> int:
return (
2
if cloud in {PowerPlatformCloud.FIRST_RELEASE, PowerPlatformCloud.PROD}
else 1
)
Loading