Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ async def test_memory_storage_integration(self):

# Verify data exists in memory storage
storage_key = user_state.get_storage_key(self.context)
stored_data = await memory_storage.read([storage_key])
stored_data = await memory_storage.read([storage_key], target_cls=TestDataItem)

assert storage_key in stored_data
assert stored_data[storage_key] is not None
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .blob_storage import BlobStorage, BlobStorageSettings
from .blob_storage import BlobStorage
from .blob_storage_config import BlobStorageConfig

__all__ = ["BlobStorage", "BlobStorageSettings"]
__all__ = ["BlobStorage", "BlobStorageConfig"]
Original file line number Diff line number Diff line change
@@ -1,174 +1,97 @@
# based on
# https://github.com/microsoft/botbuilder-python/blob/main/libraries/botbuilder-azure/botbuilder/azure/blob_storage.py

from typing import TypeVar
from io import BytesIO
import json
from typing import TypeVar, Union
from io import BytesIO

from azure.core.exceptions import (
HttpResponseError,
ResourceExistsError,
ResourceNotFoundError,
)
from azure.storage.blob.aio import (
ContainerClient,
BlobServiceClient,
)

from microsoft.agents.storage import StoreItem
from microsoft.agents.storage.storage import AsyncStorageBase
from microsoft.agents.storage._type_aliases import JSON
from microsoft.agents.storage import Storage, StoreItem
from microsoft.agents.storage.error_handling import ignore_error, is_status_code_error

StoreItemT = TypeVar("StoreItemT", bound=StoreItem)


class BlobStorageSettings:

def __init__(
self,
container_name: str,
account_name: str = "",
account_key: str = "",
connection_string: str = "",
):
self.container_name = container_name
self.account_name = account_name
self.account_key = account_key
self.connection_string = connection_string
from .blob_storage_config import BlobStorageConfig

StoreItemT = TypeVar("StoreItemT", bound=StoreItem)

def convert_account_name_and_key_to_connection_string(settings: BlobStorageSettings):
if not settings.account_name or not settings.account_key:
raise ValueError(
"account_name and account_key are both required for BlobStorageSettings if not using a connections string."
)
return (
f"DefaultEndpointsProtocol=https;AccountName={settings.account_name};"
f"AccountKey={settings.account_key};EndpointSuffix=core.windows.net"
)

class BlobStorage(AsyncStorageBase):

class BlobStorage(Storage):
def __init__(self, config: BlobStorageConfig):

def __init__(self, settings: BlobStorageSettings):
if not settings.container_name:
if not config.container_name:
raise ValueError("BlobStorage: Container name is required.")

connection_string: str = settings.connection_string
if not connection_string:
# New Azure Blob SDK only allows connection strings, but our SDK allows key+name.
# This is here for backwards compatibility.
connection_string = convert_account_name_and_key_to_connection_string(
settings
)

blob_service_client: BlobServiceClient = (
BlobServiceClient.from_connection_string(connection_string)
)
self.config = config

blob_service_client: BlobServiceClient = self._create_client()
self._container_client: ContainerClient = (
blob_service_client.get_container_client(settings.container_name)
blob_service_client.get_container_client(config.container_name)
)
self._initialized: bool = False

async def _initialize_container(self):
"""Initializes the storage container"""
if self._initialized is False:
# This should only happen once - assuming this is a singleton.
# ContainerClient.exists() method is available in an unreleased version of the SDK. Until then, we use:
try:
await self._container_client.create_container()
except ResourceExistsError:
pass
self._initialized = True

return self._initialized

async def read(
self, keys: list[str], *, target_cls: StoreItemT = None, **kwargs
) -> dict[str, StoreItemT]:
"""Retrieve entities from the configured blob container.

:param keys: An array of entity keys.
:type keys: dict[str, StoreItem]
:param target_cls: The StoreItem class to deserialize retrieved values into.
:type target_cls: StoreItem
:return dict:
"""
if not keys:
raise ValueError("BlobStorage.read(): Keys are required when reading.")
if not target_cls:
raise ValueError("BlobStorage.read(): target_cls cannot be None.")

await self._initialize_container()

result: dict[str, StoreItem] = {}
for key in keys:

try:
item_rep: str = await (
await self._container_client.download_blob(blob=key)
).readall()
item_JSON: JSON = json.loads(item_rep)
except HttpResponseError as error:
if error.status_code == 404:
continue
else:
raise HttpResponseError(
f"BlobStorage.read(): Error reading blob '{key}': {error}"
)

try:
result[key] = target_cls.from_json_to_store_item(item_JSON)
except AttributeError as error:
raise TypeError(
f"BlobStorage.read(): could not deserialize blob item into {target_cls} class. Error: {error}"
)

return result

async def write(self, changes: dict[str, StoreItem]):
"""Stores a new entity in the configured blob container.

:param changes: The changes to write to storage.
:type changes: dict[str, StoreItem]
:return:
"""
if not changes:
raise ValueError("BlobStorage.write(): changes cannot be None nor empty")

await self._initialize_container()

for key, item in changes.items():

item_JSON: JSON = item.store_item_to_json()
if item_JSON is None:
def _create_client(self) -> BlobServiceClient:
if self.config.url: # connect with URL and credentials
if not self.config.credential:
raise ValueError(
"BlobStorage.write(): StoreItem serialization cannot return None"
"BlobStorage: Credential is required when using a custom service URL."
)
item_rep_bytes = json.dumps(item_JSON).encode("utf-8")

# providing the length parameter may improve performance
await self._container_client.upload_blob(
name=key,
data=BytesIO(item_rep_bytes),
overwrite=True,
length=len(item_rep_bytes),
return BlobServiceClient(
account_url=self.config.url, credential=self.config.credential
)

async def delete(self, keys: list[str]):
"""Deletes entity blobs from the configured container.
else: # connect with connection string
return BlobServiceClient.from_connection_string(
self.config.connection_string
)

:param keys: An array of entity keys.
:type keys: list[str]
"""
if keys is None:
raise ValueError("BlobStorage.delete(): keys parameter can't be null")
async def initialize(self) -> None:
"""Initializes the storage container"""
if not self._initialized:
# This should only happen once - assuming this is a singleton.
await ignore_error(
self._container_client.create_container(), is_status_code_error(409)
)
self._initialized = True

async def _read_item(
self, key: str, *, target_cls: StoreItemT = None, **kwargs
) -> tuple[Union[str, None], Union[StoreItemT, None]]:
item = await ignore_error(
self._container_client.download_blob(blob=key),
is_status_code_error(404),
)
if not item:
return None, None

item_rep: str = await item.readall()
item_JSON: JSON = json.loads(item_rep)
try:
return key, target_cls.from_json_to_store_item(item_JSON)
except AttributeError as error:
raise TypeError(
f"BlobStorage.read_item(): could not deserialize blob item into {target_cls} class. Error: {error}"
)

await self._initialize_container()
async def _write_item(self, key: str, item: StoreItem) -> None:
item_JSON: JSON = item.store_item_to_json()
if item_JSON is None:
raise ValueError(
"BlobStorage.write(): StoreItem serialization cannot return None"
)
item_rep_bytes = json.dumps(item_JSON).encode("utf-8")

# getting the length is important for performance with large blobs
await self._container_client.upload_blob(
name=key,
data=BytesIO(item_rep_bytes),
overwrite=True,
length=len(item_rep_bytes),
)

for key in keys:
try:
await self._container_client.delete_blob(blob=key)
# We can't delete what's already gone.
except ResourceNotFoundError:
pass
async def _delete_item(self, key: str) -> None:
await ignore_error(
self._container_client.delete_blob(blob=key), is_status_code_error(404)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from typing import Union

from azure.core.credentials import TokenCredential


class BlobStorageConfig:
"""Configuration settings for BlobStorage."""

def __init__(
self,
container_name: str,
connection_string: str = "",
url: str = "",
credential: Union[TokenCredential, None] = None,
):
"""Configuration settings for BlobStorage.

container_name: The name of the blob container.
connection_string: The connection string to the storage account.
url: The URL of the blob service. If provided, credential must also be provided.
credential: The TokenCredential to use for authentication when using a custom URL.

credential-based authentication is prioritized over connection string authentication.
"""
self.container_name: str = container_name
self.connection_string: str = connection_string
self.url: str = url
self.credential: Union[TokenCredential, None] = credential
Loading