From 7cedda339100412be80ac5ebfc7b5ae7d401a2da Mon Sep 17 00:00:00 2001 From: Chris Mullins Date: Mon, 6 Oct 2025 14:50:25 -0700 Subject: [PATCH 1/4] Add FileTranscriptStore implementation and update transcript handling - Introduced FileTranscriptStore for logging activities to a file. - Updated TranscriptLogger and TranscriptMemoryStore to utilize PagedResult for consistent return types. - Enhanced tests for FileTranscriptStore to validate logging, listing, and retrieving transcripts. --- .../hosting/core/storage/__init__.py | 6 +- .../core/storage/transcript_file_store.py | 284 ++++++++++++++++++ .../hosting/core/storage/transcript_logger.py | 7 +- .../core/storage/transcript_memory_store.py | 14 +- .../storage/test_file_transcript_storage.py | 154 ++++++++++ .../test_transcript_logger_middleware.py | 15 +- .../storage/test_transcript_store_memory.py | 169 ++++------- 7 files changed, 528 insertions(+), 121 deletions(-) create mode 100644 libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_file_store.py create mode 100644 tests/hosting_core/storage/test_file_transcript_storage.py diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/__init__.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/__init__.py index 8e2f956a..f53ead89 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/__init__.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/__init__.py @@ -7,8 +7,10 @@ ConsoleTranscriptLogger, TranscriptLoggerMiddleware, FileTranscriptLogger, + PagedResult ) from .transcript_store import TranscriptStore +from .transcript_file_store import FileTranscriptStore __all__ = [ "StoreItem", @@ -21,4 +23,6 @@ "TranscriptLoggerMiddleware", "TranscriptStore", "FileTranscriptLogger", -] + "FileTranscriptStore", + "PagedResult" +] \ No newline at end of file diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_file_store.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_file_store.py new file mode 100644 index 00000000..3d1102af --- /dev/null +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_file_store.py @@ -0,0 +1,284 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from __future__ import annotations + +import asyncio +import json +import os +import re + +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, Iterable, List, Optional, Tuple, Union + +from .transcript_logger import TranscriptLogger +from .transcript_logger import PagedResult +from .transcript_info import TranscriptInfo + +from microsoft_agents.activity import Activity # type: ignore + +class FileTranscriptStore(TranscriptLogger): + """ + Python port of the .NET FileTranscriptStore which creates a single + `.transcript` file per conversation and appends each Activity as newline-delimited JSON. + + Layout on disk: + //.transcript + + - Each line is a JSON object representing one Activity. + - Methods are async to match the Agents SDK shape. + + Notes + ----- + * Continuation tokens are simple integer byte offsets encoded as strings. + * Activities are written using UTF-8 with newline separators (JSONL). + * Filenames are sanitized to avoid path traversal and invalid characters. + + Inspired by the .NET design for FileTranscriptLogger. See: + - Microsoft.Bot.Builder FileTranscriptLogger docs (for behavior) [DOTNET] + - Microsoft.Agents.Storage.Transcript namespace overview [AGENTS] + """ + + def __init__(self, root_folder: Union[str, Path]) -> None: + self._root = Path(root_folder).expanduser().resolve() + self._root.mkdir(parents=True, exist_ok=True) + + # precompiled regex for safe names (letters, digits, dash, underscore, dot) + self._safe = re.compile(r"[^A-Za-z0-9._-]+") + + # -------- Logger surface -------- + + async def log_activity(self, activity: Activity) -> None: + """ + Append a single Activity to its conversation transcript. + + Parameters + ---------- + activity : Activity + An activity object (from microsoft_agents.activity.Activity or a dict with similar shape) + Must contain: + - channel_id (str) -> activity["channel_id"] or activity.channel_id + - conversation.id (str) -> activity["conversation"]["id"] or activity.conversation.id + """ + if not activity: + raise ValueError("Activity is required") + + channel_id, conversation_id = _get_ids(activity) + file_path = self._file_path(channel_id, conversation_id) + file_path.parent.mkdir(parents=True, exist_ok=True) + + # Ensure a stable timestamp property if absent + # Write in a background thread to avoid blocking the event loop + def _write() -> None: + # Normalize to a dict to ensure json serializable content. + obj = _to_plain_dict(activity) + obj.setdefault("timestamp", _utc_iso_now()) + + with open(file_path, "a", encoding="utf-8", newline="\n") as f: + json.dump(obj, f, ensure_ascii=False, separators=(",", ":")) + f.write("\n") + + await asyncio.to_thread(_write) + + # -------- Store surface -------- + + async def list_transcripts(self, channel_id: str) -> PagedResult: + """ + List transcripts (conversations) for a channel. + + Returns TranscriptInfo for each `.transcript` found. + """ + channel_dir = self._channel_dir(channel_id) + + def _list() -> List[TranscriptInfo]: + if not channel_dir.exists(): + return [] + results: List[TranscriptInfo] = [] + for p in channel_dir.glob("*.transcript"): + # mtime is a reasonable proxy for 'created/updated' + created = datetime.fromtimestamp(p.stat().st_mtime, tz=timezone.utc) + results.append( + TranscriptInfo( + channel_id=_sanitize(self._safe, channel_id), + conversation_id=p.stem, + created_on=created, + ) + ) + # Sort newest first (consistent, useful default) + results.sort(key=lambda t: t.created_on, reverse=True) + return results + + items = await asyncio.to_thread(_list) + return PagedResult(items=items, continuation_token=None) + + async def get_transcript_activities( + self, + channel_id: str, + conversation_id: str, + continuation_token: Optional[str] = None, + start_date: Optional[datetime] = None, + page_bytes: int = 512 * 1024, + ) -> PagedResult: + """ + Read activities from the transcript file (paged by byte size). + + Parameters + ---------- + continuation_token : str, optional + An opaque token from a previous call. In this implementation it's the + byte offset (str(int)). Omit or pass None to start at the beginning. + start_date : datetime, optional + If provided, only activities with `timestamp >= start_date` are returned. + (Compares the 'timestamp' property when present; otherwise includes the line.) + page_bytes : int + Max bytes to read from file on this call (not a hard activity count limit). + + Returns + ------- + PagedResult with: + - items: List[Activity-like dicts] + - continuation_token: str offset for next page, or None if EOF + """ + file_path = self._file_path(channel_id, conversation_id) + + def _read_page() -> Tuple[List[Dict[str, Any]], Optional[str]]: + if not file_path.exists(): + return [], None + + offset = int(continuation_token) if continuation_token else 0 + results: List[Dict[str, Any]] = [] + + with open(file_path, "rb") as f: + f.seek(0, os.SEEK_END) + end = f.tell() + if offset > end: + return [], None + f.seek(offset) + # Read a chunk + raw = f.read(page_bytes) + # Extend to end of current line to avoid cutting a JSON record in half + # (read until newline or EOF) + while True: + ch = f.read(1) + if not ch: + break + raw += ch + if ch == b"\n": + break + + next_offset = f.tell() + # Decode and split lines + text = raw.decode("utf-8", errors="ignore") + lines = [ln for ln in text.splitlines() if ln.strip()] + + # Parse JSONL + for ln in lines: + try: + obj = json.loads(ln) + except Exception: + # Skip malformed lines + continue + if start_date: + ts = _parse_iso_utc(obj.get("timestamp")) + if ts and ts < start_date.astimezone(timezone.utc): + continue + results.append(obj) + + token = str(next_offset) if next_offset < end else None + return results, token + + items, token = await asyncio.to_thread(_read_page) + return PagedResult(items=items, continuation_token=token) + + async def delete_transcript(self, channel_id: str, conversation_id: str) -> None: + """Delete the specified conversation transcript file (no-op if absent).""" + file_path = self._file_path(channel_id, conversation_id) + + def _delete() -> None: + try: + file_path.unlink(missing_ok=True) + except Exception: + # Best-effort deletion: ignore failures (locked file, etc.) + pass + + await asyncio.to_thread(_delete) + + # ---------------------------- + # Helpers + # ---------------------------- + + def _channel_dir(self, channel_id: str) -> Path: + return self._root / _sanitize(self._safe, channel_id) + + def _file_path(self, channel_id: str, conversation_id: str) -> Path: + safe_channel = _sanitize(self._safe, channel_id) + safe_conv = _sanitize(self._safe, conversation_id) + return self._root / safe_channel / f"{safe_conv}.transcript" + + +# ---------------------------- +# Module-level helpers +# ---------------------------- + +def _sanitize(pattern: re.Pattern[str], value: str) -> str: + # Replace path-separators and illegal filename chars with '-' + value = (value or "").strip().replace(os.sep, "-").replace("/", "-") + value = pattern.sub("-", value) + return value or "unknown" + +def _get_ids(activity: Activity) -> Tuple[str, str]: + # Works with both dict-like and object-like Activity + def _get(obj: Any, *path: str) -> Optional[Any]: + cur = obj + for key in path: + if cur is None: + return None + if isinstance(cur, dict): + cur = cur.get(key) + else: + cur = getattr(cur, key, None) + return cur + + channel_id = _get(activity, "channel_id") or _get(activity, "channelId") + conversation_id = _get(activity, "conversation", "id") + if not channel_id or not conversation_id: + raise ValueError("Activity must include channel_id and conversation.id") + return str(channel_id), str(conversation_id) + +def _to_plain_dict(activity: Activity) -> Dict[str, Any]: + if isinstance(activity, dict): + return activity + # Best-effort conversion for dataclass/attr/objects + try: + import dataclasses + if dataclasses.is_dataclass(activity): + return dataclasses.asdict(activity) # type: ignore[arg-type] + except Exception: + pass + try: + return json.loads(json.dumps(activity, default=lambda o: getattr(o, "__dict__", str(o)))) + except Exception: + # Fallback: minimal projection + channel_id, conversation_id = _get_ids(activity) + return { + "type": getattr(activity, "type", "message"), + "id": getattr(activity, "id", None), + "channel_id": channel_id, + "conversation": {"id": conversation_id}, + "text": getattr(activity, "text", None), + } + +def _utc_iso_now() -> str: + return datetime.now(timezone.utc).isoformat() + +def _parse_iso_utc(value: Optional[str]) -> Optional[datetime]: + if not value: + return None + try: + # Allow fromisoformat with 'Z' or offset + if value.endswith("Z"): + value = value[:-1] + "+00:00" + return datetime.fromisoformat(value).astimezone(timezone.utc) + except Exception: + return None diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_logger.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_logger.py index 6fba5b77..7032cb63 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_logger.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_logger.py @@ -5,10 +5,12 @@ import string import json +from typing import Any, Optional from abc import ABC, abstractmethod from datetime import datetime, timezone from queue import Queue from typing import Awaitable, Callable, List, Optional +from dataclasses import dataclass from microsoft_agents.activity import Activity, ChannelAccount from microsoft_agents.activity.activity import ConversationReference @@ -16,6 +18,10 @@ from microsoft_agents.activity.conversation_reference import ActivityEventNames from microsoft_agents.hosting.core.middleware_set import Middleware, TurnContext +@dataclass +class PagedResult: + items: List[Any] + continuation_token: Optional[str] = None class TranscriptLogger(ABC): @abstractmethod @@ -27,7 +33,6 @@ async def log_activity(self, activity: Activity) -> None: """ pass - class ConsoleTranscriptLogger(TranscriptLogger): """ ConsoleTranscriptLogger writes activities to Console output. This is a DEBUG class, intended for testing diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_memory_store.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_memory_store.py index 5c8d0764..478fc561 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_memory_store.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_memory_store.py @@ -4,7 +4,7 @@ from threading import Lock from datetime import datetime, timezone from typing import List -from .transcript_logger import TranscriptLogger +from .transcript_logger import TranscriptLogger, PagedResult from .transcript_info import TranscriptInfo from microsoft_agents.activity import Activity @@ -52,7 +52,7 @@ async def get_transcript_activities( conversation_id: str, continuation_token: str = None, start_date: datetime = datetime.min.replace(tzinfo=timezone.utc), - ) -> tuple[list[Activity], str]: + ) -> PagedResult: """ Retrieves activities for a given channel and conversation, optionally filtered by start_date. @@ -60,7 +60,7 @@ async def get_transcript_activities( :param conversation_id: The conversation ID to filter activities. :param continuation_token: (Unused) Token for pagination. :param start_date: Only activities with timestamp >= start_date are returned. None timestamps are treated as datetime.min. - :return: A tuple containing the filtered list of Activity objects and a continuation token (always None). + :return: A PagedResult containing the filtered list of Activity objects and a continuation token (always None). :raises ValueError: If channel_id or conversation_id is None. """ if not channel_id: @@ -98,7 +98,7 @@ async def get_transcript_activities( >= start_date ] - return filtered_sorted_activities, None + return PagedResult(items=filtered_sorted_activities, continuation_token=None) async def delete_transcript(self, channel_id: str, conversation_id: str) -> None: """ @@ -126,13 +126,13 @@ async def delete_transcript(self, channel_id: str, conversation_id: str) -> None async def list_transcripts( self, channel_id: str, continuation_token: str = None - ) -> tuple[list[TranscriptInfo], str]: + ) -> PagedResult: """ Lists all transcripts (unique conversation IDs) for a given channel. :param channel_id: The channel ID to list transcripts for. :param continuation_token: (Unused) Token for pagination. - :return: A tuple containing a list of TranscriptInfo objects and a continuation token (always None). + :return: A PagedResult containing a list of TranscriptInfo objects and a continuation token (always None). :raises ValueError: If channel_id is None. """ if not channel_id: @@ -151,4 +151,4 @@ async def list_transcripts( TranscriptInfo(channel_id=channel_id, conversation_id=conversation_id) for conversation_id in conversations ] - return transcript_infos, None + return PagedResult(items=transcript_infos, continuation_token=None) diff --git a/tests/hosting_core/storage/test_file_transcript_storage.py b/tests/hosting_core/storage/test_file_transcript_storage.py new file mode 100644 index 00000000..1a62e2f5 --- /dev/null +++ b/tests/hosting_core/storage/test_file_transcript_storage.py @@ -0,0 +1,154 @@ +import asyncio +import json +import os +import tempfile +from datetime import datetime, timedelta, timezone +from pathlib import Path + +import pytest +import pytest_asyncio + +from microsoft_agents.hosting.core.storage import FileTranscriptStore, PagedResult + +@pytest_asyncio.fixture +async def temp_logger(): + """Create a temporary logger with an isolated folder.""" + with tempfile.TemporaryDirectory() as tmpdir: + logger = FileTranscriptStore(tmpdir) + yield logger + + +def make_activity(channel="testChannel", conv="conv1", text="hello"): + return { + "id": "activity1", + "type": "message", + "channel_id": channel, + "conversation": {"id": conv}, + "text": text, + } + + +# ---------------------------- +# log_activity +# ---------------------------- + +@pytest.mark.asyncio +async def test_log_activity_creates_file(temp_logger: FileTranscriptStore): + activity = make_activity() + await temp_logger.log_activity(activity) + + file_path = Path(temp_logger._root) / "testChannel" / "conv1.transcript" + assert file_path.exists(), "Transcript file should be created" + + contents = file_path.read_text(encoding="utf-8").strip() + assert contents, "Transcript file should not be empty" + data = json.loads(contents) + assert data["text"] == "hello" + assert data["conversation"]["id"] == "conv1" + + +@pytest.mark.asyncio +async def test_log_activity_appends_multiple_lines(temp_logger: FileTranscriptStore): + activity1 = make_activity(text="first") + activity2 = make_activity(text="second") + await temp_logger.log_activity(activity1) + await temp_logger.log_activity(activity2) + + file_path = Path(temp_logger._root) / "testChannel" / "conv1.transcript" + lines = file_path.read_text(encoding="utf-8").splitlines() + assert len(lines) == 2 + texts = [json.loads(l)["text"] for l in lines] + assert texts == ["first", "second"] + + +# ---------------------------- +# list_transcripts +# ---------------------------- + +@pytest.mark.asyncio +async def test_list_transcripts_returns_conversations(temp_logger: FileTranscriptStore): + await temp_logger.log_activity(make_activity(conv="convA")) + await temp_logger.log_activity(make_activity(conv="convB")) + result = await temp_logger.list_transcripts("testChannel") + + assert isinstance(result, PagedResult) + ids = [t.conversation_id for t in result.items] + assert {"convA", "convB"} <= set(ids) + + +@pytest.mark.asyncio +async def test_list_transcripts_empty_channel(temp_logger: FileTranscriptStore): + result = await temp_logger.list_transcripts("nonexistent") + assert isinstance(result, PagedResult) + assert result.items == [] + + +# ---------------------------- +# get_transcript_activities +# ---------------------------- + +@pytest.mark.asyncio +async def test_get_transcript_activities_reads_logged_items(temp_logger: FileTranscriptStore): + for i in range(3): + await temp_logger.log_activity(make_activity(conv="convX", text=f"msg{i}")) + + result = await temp_logger.get_transcript_activities("testChannel", "convX") + texts = [a["text"] for a in result.items] + assert texts == ["msg0", "msg1", "msg2"] + assert result.continuation_token is None + + +@pytest.mark.asyncio +async def test_get_transcript_activities_with_paging(temp_logger: FileTranscriptStore): + # Add many lines to force paging + for i in range(50): + await temp_logger.log_activity(make_activity(conv="paged", text=f"msg{i}")) + + first = await temp_logger.get_transcript_activities("testChannel", "paged", page_bytes=300) + assert len(first.items) > 0 + assert first.continuation_token is not None + + second = await temp_logger.get_transcript_activities( + "testChannel", "paged", continuation_token=first.continuation_token, page_bytes=300 + ) + assert len(second.items) > 0 + assert all("msg" in a["text"] for a in second.items) + + +@pytest.mark.asyncio +async def test_get_transcript_activities_with_start_date_filter(temp_logger: FileTranscriptStore): + old_ts = (datetime.now(timezone.utc) - timedelta(days=2)).isoformat() + new_ts = datetime.now(timezone.utc).isoformat() + + activity1 = make_activity(conv="filtered", text="old") + activity2 = make_activity(conv="filtered", text="new") + activity1["timestamp"] = old_ts + activity2["timestamp"] = new_ts + + await temp_logger.log_activity(activity1) + await temp_logger.log_activity(activity2) + + start_date = datetime.now(timezone.utc) - timedelta(days=1) + result = await temp_logger.get_transcript_activities("testChannel", "filtered", start_date=start_date) + texts = [a["text"] for a in result.items] + assert texts == ["new"] + + +# ---------------------------- +# delete_transcript +# ---------------------------- + +@pytest.mark.asyncio +async def test_delete_transcript_removes_file(temp_logger: FileTranscriptStore): + await temp_logger.log_activity(make_activity(conv="todelete")) + file_path = Path(temp_logger._root) / "testChannel" / "todelete.transcript" + assert file_path.exists() + + await temp_logger.delete_transcript("testChannel", "todelete") + assert not file_path.exists() + + +@pytest.mark.asyncio +async def test_delete_transcript_nonexistent(temp_logger: FileTranscriptStore): + # Should not raise any errors + await temp_logger.delete_transcript("channel", "nonexistent") diff --git a/tests/hosting_core/storage/test_transcript_logger_middleware.py b/tests/hosting_core/storage/test_transcript_logger_middleware.py index f63db030..7065baa2 100644 --- a/tests/hosting_core/storage/test_transcript_logger_middleware.py +++ b/tests/hosting_core/storage/test_transcript_logger_middleware.py @@ -41,18 +41,15 @@ async def callback(tc): await adapter.process_activity(id, a1, callback) - transcriptAndContinuationToken = await transcript_store.get_transcript_activities( + pagedResult = await transcript_store.get_transcript_activities( channelName, conversation_id ) - transcript = transcriptAndContinuationToken[0] - continuationToken = transcriptAndContinuationToken[1] - - assert len(transcript) == 1 - assert transcript[0].channel_id == channelName - assert transcript[0].conversation.id == conversation_id - assert transcript[0].text == a1.text - assert continuationToken is None + assert len(pagedResult.items) == 1 + assert pagedResult.items[0].channel_id == channelName + assert pagedResult.items[0].conversation.id == conversation_id + assert pagedResult.items[0].text == a1.text + assert pagedResult.continuation_token is None @pytest.mark.asyncio diff --git a/tests/hosting_core/storage/test_transcript_store_memory.py b/tests/hosting_core/storage/test_transcript_store_memory.py index 7d11e752..2d3fb631 100644 --- a/tests/hosting_core/storage/test_transcript_store_memory.py +++ b/tests/hosting_core/storage/test_transcript_store_memory.py @@ -4,7 +4,7 @@ from datetime import datetime, timezone import pytest from microsoft_agents.hosting.core.storage.transcript_memory_store import ( - TranscriptMemoryStore, + TranscriptMemoryStore, PagedResult ) from microsoft_agents.activity import Activity, ConversationAccount @@ -12,14 +12,11 @@ @pytest.mark.asyncio async def test_get_transcript_empty(): store = TranscriptMemoryStore() - transcriptAndContinuationToken = await store.get_transcript_activities( + pagedResult = await store.get_transcript_activities( "Channel 1", "Conversation 1" ) - transcript = transcriptAndContinuationToken[0] - continuationToken = transcriptAndContinuationToken[1] - assert transcript == [] - assert continuationToken is None - + assert pagedResult.items == [] + assert pagedResult.continuation_token is None @pytest.mark.asyncio async def test_log_activity_add_one_activity(): @@ -33,35 +30,29 @@ async def test_log_activity_add_one_activity(): await store.log_activity(activity) # Ask for the activity we just added - transcriptAndContinuationToken = await store.get_transcript_activities( + pagedResult = await store.get_transcript_activities( "Channel 1", "Conversation 1" ) - transcript = transcriptAndContinuationToken[0] - continuationToken = transcriptAndContinuationToken[1] - assert len(transcript) == 1 - assert transcript[0].channel_id == activity.channel_id - assert transcript[0].conversation.id == activity.conversation.id - assert transcript[0].text == activity.text - assert continuationToken is None + assert len(pagedResult.items) == 1 + assert pagedResult.items[0].channel_id == activity.channel_id + assert pagedResult.items[0].conversation.id == activity.conversation.id + assert pagedResult.items[0].text == activity.text + assert pagedResult.continuation_token is None # Ask for a channel that doesn't exist and make sure we get nothing - transcriptAndContinuationToken = await store.get_transcript_activities( + pagedResult = await store.get_transcript_activities( "Invalid", "Conversation 1" ) - transcript = transcriptAndContinuationToken[0] - continuationToken = transcriptAndContinuationToken[1] - assert transcript == [] - assert continuationToken is None + assert pagedResult.items == [] + assert pagedResult.continuation_token is None # Ask for a ConversationID that doesn't exist and make sure we get nothing - transcriptAndContinuationToken = await store.get_transcript_activities( + pagedResult = await store.get_transcript_activities( "Channel 1", "INVALID" ) - transcript = transcriptAndContinuationToken[0] - continuationToken = transcriptAndContinuationToken[1] - assert transcript == [] - assert continuationToken is None + assert pagedResult.items == [] + assert pagedResult.continuation_token is None @pytest.mark.asyncio @@ -81,26 +72,23 @@ async def test_log_activity_add_two_activity_same_conversation(): await store.log_activity(activity2) # Ask for the activity we just added - transcriptAndContinuationToken = await store.get_transcript_activities( + pagedResult = await store.get_transcript_activities( "Channel 1", "Conversation 1" ) - transcript = transcriptAndContinuationToken[0] - continuationToken = transcriptAndContinuationToken[1] - - assert len(transcript) == 2 - assert transcript[0].channel_id == activity1.channel_id - assert transcript[0].conversation.id == activity1.conversation.id - assert transcript[0].text == activity1.text - assert transcript[1].channel_id == activity2.channel_id - assert transcript[1].conversation.id == activity2.conversation.id - assert transcript[1].text == activity2.text + assert len(pagedResult.items) == 2 + assert pagedResult.items[0].channel_id == activity1.channel_id + assert pagedResult.items[0].conversation.id == activity1.conversation.id + assert pagedResult.items[0].text == activity1.text - assert continuationToken is None + assert pagedResult.items[1].channel_id == activity2.channel_id + assert pagedResult.items[1].conversation.id == activity2.conversation.id + assert pagedResult.items[1].text == activity2.text + assert pagedResult.continuation_token is None @pytest.mark.asyncio -async def test_log_activity_add_two_activity_same_conversation(): +async def test_log_activity_add_three_activity_same_conversation(): store = TranscriptMemoryStore() activity1 = Activity.create_message_activity() activity1.text = "Activity 1" @@ -130,28 +118,22 @@ async def test_log_activity_add_two_activity_same_conversation(): date3 = datetime(2019, 1, 1, 12, 0, 0, tzinfo=timezone.utc) # ask for everything after 1999. Should get all 3 activities - transcriptAndContinuationToken = await store.get_transcript_activities( + pagedResult = await store.get_transcript_activities( "Channel 1", "Conversation 1", None, date1 ) - transcript = transcriptAndContinuationToken[0] - continuationToken = transcriptAndContinuationToken[1] - assert len(transcript) == 3 + assert len(pagedResult.items) == 3 # ask for everything after 2009. Should get 2 activities - the 2010 and 2020 activities - transcriptAndContinuationToken = await store.get_transcript_activities( + pagedResult = await store.get_transcript_activities( "Channel 1", "Conversation 1", None, date2 ) - transcript = transcriptAndContinuationToken[0] - continuationToken = transcriptAndContinuationToken[1] - assert len(transcript) == 2 + assert len(pagedResult.items) == 2 # ask for everything after 2019. Should only get the 2020 activity - transcriptAndContinuationToken = await store.get_transcript_activities( + pagedResult = await store.get_transcript_activities( "Channel 1", "Conversation 1", None, date3 ) - transcript = transcriptAndContinuationToken[0] - continuationToken = transcriptAndContinuationToken[1] - assert len(transcript) == 1 + assert len(pagedResult.items) == 1 @pytest.mark.asyncio @@ -171,31 +153,26 @@ async def test_log_activity_add_two_activity_two_conversation(): await store.log_activity(activity2) # Ask for the activity we just added - transcriptAndContinuationToken = await store.get_transcript_activities( + pagedResult = await store.get_transcript_activities( "Channel 1", "Conversation 1" ) - transcript = transcriptAndContinuationToken[0] - continuationToken = transcriptAndContinuationToken[1] - assert len(transcript) == 1 - assert transcript[0].channel_id == activity1.channel_id - assert transcript[0].conversation.id == activity1.conversation.id - assert transcript[0].text == activity1.text - assert continuationToken is None + assert len(pagedResult.items) == 1 + assert pagedResult.items[0].channel_id == activity1.channel_id + assert pagedResult.items[0].conversation.id == activity1.conversation.id + assert pagedResult.items[0].text == activity1.text + assert pagedResult.continuation_token is None # Now grab Conversation 2 - transcriptAndContinuationToken = await store.get_transcript_activities( + pagedResult = await store.get_transcript_activities( "Channel 1", "Conversation 2" ) - transcript = transcriptAndContinuationToken[0] - continuationToken = transcriptAndContinuationToken[1] - - assert len(transcript) == 1 - assert transcript[0].channel_id == activity2.channel_id - assert transcript[0].conversation.id == activity2.conversation.id - assert transcript[0].text == activity2.text - assert continuationToken is None + assert len(pagedResult.items) == 1 + assert pagedResult.items[0].channel_id == activity2.channel_id + assert pagedResult.items[0].conversation.id == activity2.conversation.id + assert pagedResult.items[0].text == activity2.text + assert pagedResult.continuation_token is None @pytest.mark.asyncio async def test_delete_one_transcript(): @@ -209,21 +186,18 @@ async def test_delete_one_transcript(): await store.log_activity(activity) # Ask for the activity we just added - transcriptAndContinuationToken = await store.get_transcript_activities( + pagedResult = await store.get_transcript_activities( "Channel 1", "Conversation 1" ) - transcript = transcriptAndContinuationToken[0] - continuationToken = transcriptAndContinuationToken[1] - assert len(transcript) == 1 + assert len(pagedResult.items) == 1 # Now delete the transcript await store.delete_transcript("Channel 1", "Conversation 1") - transcriptAndContinuationToken = await store.get_transcript_activities( + pagedResult = await store.get_transcript_activities( "Channel 1", "Conversation 1" - ) - transcript = transcriptAndContinuationToken[0] - assert len(transcript) == 0 + ) + assert len(pagedResult.items) == 0 @pytest.mark.asyncio @@ -250,19 +224,16 @@ async def test_delete_one_transcript_of_two(): await store.delete_transcript("Channel 1", "Conversation 1") # Make sure the one we deleted is gone - transcriptAndContinuationToken = await store.get_transcript_activities( + pagedResult = await store.get_transcript_activities( "Channel 1", "Conversation 1" ) - transcript = transcriptAndContinuationToken[0] - assert len(transcript) == 0 + assert len(pagedResult.items) == 0 # Make sure the other one is still there - transcriptAndContinuationToken = await store.get_transcript_activities( + pagedResult = await store.get_transcript_activities( "Channel 2", "Conversation 1" - ) - transcript = transcriptAndContinuationToken[0] - assert len(transcript) == 1 - + ) + assert len(pagedResult.items) == 1 @pytest.mark.asyncio async def test_list_transcripts(): @@ -279,34 +250,26 @@ async def test_list_transcripts(): activity2.conversation = ConversationAccount(id="Conversation 1") # Make sure a list on an empty store returns an empty set - transcriptAndContinuationToken = await store.list_transcripts("Should Be Empty") - transcript = transcriptAndContinuationToken[0] - continuationToken = transcriptAndContinuationToken[1] - assert len(transcript) == 0 - assert continuationToken is None + pagedResult = await store.list_transcripts("Should Be Empty") + assert len(pagedResult.items) == 0 + assert pagedResult.continuation_token is None # Add one activity so we can go searching await store.log_activity(activity) - transcriptAndContinuationToken = await store.list_transcripts("Channel 1") - transcript = transcriptAndContinuationToken[0] - continuationToken = transcriptAndContinuationToken[1] - assert len(transcript) == 1 - assert continuationToken is None + pagedResult = await store.list_transcripts("Channel 1") + assert len(pagedResult.items) == 1 + assert pagedResult.continuation_token is None # Add second activity on a different channel, so now we have 2 transcripts await store.log_activity(activity2) # Check again for "Transcript 1" which is on channel 1 - transcriptAndContinuationToken = await store.list_transcripts("Channel 1") - transcript = transcriptAndContinuationToken[0] - continuationToken = transcriptAndContinuationToken[1] - assert len(transcript) == 1 - assert continuationToken is None + pagedResult = await store.list_transcripts("Channel 1") + assert len(pagedResult.items) == 1 + assert pagedResult.continuation_token is None # Check for "Transcript 2" which is on channel 2 - transcriptAndContinuationToken = await store.list_transcripts("Channel 2") - transcript = transcriptAndContinuationToken[0] - continuationToken = transcriptAndContinuationToken[1] - assert len(transcript) == 1 - assert continuationToken is None + pagedResult = await store.list_transcripts("Channel 2") + assert len(pagedResult.items) == 1 + assert pagedResult.continuation_token is None From 746645aa9e2e84ce2a2ee2c050925f1086f2253e Mon Sep 17 00:00:00 2001 From: Chris Mullins Date: Mon, 6 Oct 2025 15:06:32 -0700 Subject: [PATCH 2/4] Run Black code formatter --- .../hosting/core/storage/__init__.py | 6 +++--- .../hosting/core/storage/transcript_file_store.py | 15 ++++++++++++--- .../hosting/core/storage/transcript_logger.py | 3 +++ .../core/storage/transcript_memory_store.py | 4 +++- 4 files changed, 21 insertions(+), 7 deletions(-) diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/__init__.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/__init__.py index f53ead89..21c334cb 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/__init__.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/__init__.py @@ -7,7 +7,7 @@ ConsoleTranscriptLogger, TranscriptLoggerMiddleware, FileTranscriptLogger, - PagedResult + PagedResult, ) from .transcript_store import TranscriptStore from .transcript_file_store import FileTranscriptStore @@ -24,5 +24,5 @@ "TranscriptStore", "FileTranscriptLogger", "FileTranscriptStore", - "PagedResult" -] \ No newline at end of file + "PagedResult", +] diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_file_store.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_file_store.py index 3d1102af..9356e35e 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_file_store.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_file_store.py @@ -18,6 +18,7 @@ from microsoft_agents.activity import Activity # type: ignore + class FileTranscriptStore(TranscriptLogger): """ Python port of the .NET FileTranscriptStore which creates a single @@ -36,7 +37,7 @@ class FileTranscriptStore(TranscriptLogger): * Filenames are sanitized to avoid path traversal and invalid characters. Inspired by the .NET design for FileTranscriptLogger. See: - - Microsoft.Bot.Builder FileTranscriptLogger docs (for behavior) [DOTNET] + - Microsoft.Bot.Builder FileTranscriptLogger docs (for behavior) [DOTNET] - Microsoft.Agents.Storage.Transcript namespace overview [AGENTS] """ @@ -63,7 +64,7 @@ async def log_activity(self, activity: Activity) -> None: """ if not activity: raise ValueError("Activity is required") - + channel_id, conversation_id = _get_ids(activity) file_path = self._file_path(channel_id, conversation_id) file_path.parent.mkdir(parents=True, exist_ok=True) @@ -221,12 +222,14 @@ def _file_path(self, channel_id: str, conversation_id: str) -> Path: # Module-level helpers # ---------------------------- + def _sanitize(pattern: re.Pattern[str], value: str) -> str: # Replace path-separators and illegal filename chars with '-' value = (value or "").strip().replace(os.sep, "-").replace("/", "-") value = pattern.sub("-", value) return value or "unknown" + def _get_ids(activity: Activity) -> Tuple[str, str]: # Works with both dict-like and object-like Activity def _get(obj: Any, *path: str) -> Optional[Any]: @@ -246,18 +249,22 @@ def _get(obj: Any, *path: str) -> Optional[Any]: raise ValueError("Activity must include channel_id and conversation.id") return str(channel_id), str(conversation_id) + def _to_plain_dict(activity: Activity) -> Dict[str, Any]: if isinstance(activity, dict): return activity # Best-effort conversion for dataclass/attr/objects try: import dataclasses + if dataclasses.is_dataclass(activity): return dataclasses.asdict(activity) # type: ignore[arg-type] except Exception: pass try: - return json.loads(json.dumps(activity, default=lambda o: getattr(o, "__dict__", str(o)))) + return json.loads( + json.dumps(activity, default=lambda o: getattr(o, "__dict__", str(o))) + ) except Exception: # Fallback: minimal projection channel_id, conversation_id = _get_ids(activity) @@ -269,9 +276,11 @@ def _to_plain_dict(activity: Activity) -> Dict[str, Any]: "text": getattr(activity, "text", None), } + def _utc_iso_now() -> str: return datetime.now(timezone.utc).isoformat() + def _parse_iso_utc(value: Optional[str]) -> Optional[datetime]: if not value: return None diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_logger.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_logger.py index 7032cb63..1a819ff7 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_logger.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_logger.py @@ -18,11 +18,13 @@ from microsoft_agents.activity.conversation_reference import ActivityEventNames from microsoft_agents.hosting.core.middleware_set import Middleware, TurnContext + @dataclass class PagedResult: items: List[Any] continuation_token: Optional[str] = None + class TranscriptLogger(ABC): @abstractmethod async def log_activity(self, activity: Activity) -> None: @@ -33,6 +35,7 @@ async def log_activity(self, activity: Activity) -> None: """ pass + class ConsoleTranscriptLogger(TranscriptLogger): """ ConsoleTranscriptLogger writes activities to Console output. This is a DEBUG class, intended for testing diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_memory_store.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_memory_store.py index 478fc561..d508cedd 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_memory_store.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_memory_store.py @@ -98,7 +98,9 @@ async def get_transcript_activities( >= start_date ] - return PagedResult(items=filtered_sorted_activities, continuation_token=None) + return PagedResult( + items=filtered_sorted_activities, continuation_token=None + ) async def delete_transcript(self, channel_id: str, conversation_id: str) -> None: """ From 9e545db15f29f43d1b28ee2747539caf5a529f51 Mon Sep 17 00:00:00 2001 From: Chris Mullins Date: Mon, 6 Oct 2025 16:21:16 -0700 Subject: [PATCH 3/4] Refactor transcript storage to use Activity type in PagedResult and improve timestamp handling --- .../core/storage/transcript_file_store.py | 36 ++++------- .../hosting/core/storage/transcript_logger.py | 8 ++- .../core/storage/transcript_memory_store.py | 4 +- .../storage/test_file_transcript_storage.py | 62 ++++++++++++------- 4 files changed, 62 insertions(+), 48 deletions(-) diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_file_store.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_file_store.py index 9356e35e..4429966b 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_file_store.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_file_store.py @@ -73,18 +73,18 @@ async def log_activity(self, activity: Activity) -> None: # Write in a background thread to avoid blocking the event loop def _write() -> None: # Normalize to a dict to ensure json serializable content. - obj = _to_plain_dict(activity) - obj.setdefault("timestamp", _utc_iso_now()) + if not activity.timestamp: + activity.timestamp = _utc_iso_now() with open(file_path, "a", encoding="utf-8", newline="\n") as f: - json.dump(obj, f, ensure_ascii=False, separators=(",", ":")) + f.write(activity.model_dump_json(exclude_none=True, exclude_unset=True)) f.write("\n") await asyncio.to_thread(_write) # -------- Store surface -------- - async def list_transcripts(self, channel_id: str) -> PagedResult: + async def list_transcripts(self, channel_id: str) -> PagedResult[TranscriptInfo]: """ List transcripts (conversations) for a channel. @@ -120,7 +120,7 @@ async def get_transcript_activities( continuation_token: Optional[str] = None, start_date: Optional[datetime] = None, page_bytes: int = 512 * 1024, - ) -> PagedResult: + ) -> PagedResult[Activity]: """ Read activities from the transcript file (paged by byte size). @@ -143,12 +143,12 @@ async def get_transcript_activities( """ file_path = self._file_path(channel_id, conversation_id) - def _read_page() -> Tuple[List[Dict[str, Any]], Optional[str]]: + def _read_page() -> Tuple[List[Activity], Optional[str]]: if not file_path.exists(): return [], None offset = int(continuation_token) if continuation_token else 0 - results: List[Dict[str, Any]] = [] + results: List[Activity] = [] with open(file_path, "rb") as f: f.seek(0, os.SEEK_END) @@ -176,15 +176,16 @@ def _read_page() -> Tuple[List[Dict[str, Any]], Optional[str]]: # Parse JSONL for ln in lines: try: - obj = json.loads(ln) + a = Activity.model_validate_json(ln) except Exception: # Skip malformed lines continue if start_date: - ts = _parse_iso_utc(obj.get("timestamp")) - if ts and ts < start_date.astimezone(timezone.utc): + if a.timestamp and a.timestamp < start_date.astimezone( + timezone.utc + ): continue - results.append(obj) + results.append(a) token = str(next_offset) if next_offset < end else None return results, token @@ -251,6 +252,7 @@ def _get(obj: Any, *path: str) -> Optional[Any]: def _to_plain_dict(activity: Activity) -> Dict[str, Any]: + if isinstance(activity, dict): return activity # Best-effort conversion for dataclass/attr/objects @@ -279,15 +281,3 @@ def _to_plain_dict(activity: Activity) -> Dict[str, Any]: def _utc_iso_now() -> str: return datetime.now(timezone.utc).isoformat() - - -def _parse_iso_utc(value: Optional[str]) -> Optional[datetime]: - if not value: - return None - try: - # Allow fromisoformat with 'Z' or offset - if value.endswith("Z"): - value = value[:-1] + "+00:00" - return datetime.fromisoformat(value).astimezone(timezone.utc) - except Exception: - return None diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_logger.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_logger.py index 1a819ff7..72d0c5ef 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_logger.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_logger.py @@ -17,11 +17,15 @@ from microsoft_agents.activity.activity_types import ActivityTypes from microsoft_agents.activity.conversation_reference import ActivityEventNames from microsoft_agents.hosting.core.middleware_set import Middleware, TurnContext +from typing import Generic, TypeVar + + +T = TypeVar("T") @dataclass -class PagedResult: - items: List[Any] +class PagedResult(Generic[T]): + items: List[T] continuation_token: Optional[str] = None diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_memory_store.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_memory_store.py index d508cedd..6bc170f1 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_memory_store.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_memory_store.py @@ -52,7 +52,7 @@ async def get_transcript_activities( conversation_id: str, continuation_token: str = None, start_date: datetime = datetime.min.replace(tzinfo=timezone.utc), - ) -> PagedResult: + ) -> PagedResult[Activity]: """ Retrieves activities for a given channel and conversation, optionally filtered by start_date. @@ -128,7 +128,7 @@ async def delete_transcript(self, channel_id: str, conversation_id: str) -> None async def list_transcripts( self, channel_id: str, continuation_token: str = None - ) -> PagedResult: + ) -> PagedResult[TranscriptInfo]: """ Lists all transcripts (unique conversation IDs) for a given channel. diff --git a/tests/hosting_core/storage/test_file_transcript_storage.py b/tests/hosting_core/storage/test_file_transcript_storage.py index 1a62e2f5..84eb685e 100644 --- a/tests/hosting_core/storage/test_file_transcript_storage.py +++ b/tests/hosting_core/storage/test_file_transcript_storage.py @@ -9,6 +9,8 @@ import pytest_asyncio from microsoft_agents.hosting.core.storage import FileTranscriptStore, PagedResult +from microsoft_agents.activity import Activity # type: ignore + @pytest_asyncio.fixture async def temp_logger(): @@ -18,20 +20,23 @@ async def temp_logger(): yield logger -def make_activity(channel="testChannel", conv="conv1", text="hello"): - return { - "id": "activity1", - "type": "message", - "channel_id": channel, - "conversation": {"id": conv}, - "text": text, - } +def make_activity(channel="testChannel", conv="conv1", text="hello") -> Activity: + activity = Activity( + id="activity1", + type="message", + channel_id=channel, + conversation={"id": conv}, + text=text, + ) + + return activity # ---------------------------- # log_activity # ---------------------------- + @pytest.mark.asyncio async def test_log_activity_creates_file(temp_logger: FileTranscriptStore): activity = make_activity() @@ -42,9 +47,10 @@ async def test_log_activity_creates_file(temp_logger: FileTranscriptStore): contents = file_path.read_text(encoding="utf-8").strip() assert contents, "Transcript file should not be empty" - data = json.loads(contents) - assert data["text"] == "hello" - assert data["conversation"]["id"] == "conv1" + + a = Activity.model_validate_json(contents) + assert a.text == "hello" + assert a.conversation.id == "conv1" @pytest.mark.asyncio @@ -65,6 +71,7 @@ async def test_log_activity_appends_multiple_lines(temp_logger: FileTranscriptSt # list_transcripts # ---------------------------- + @pytest.mark.asyncio async def test_list_transcripts_returns_conversations(temp_logger: FileTranscriptStore): await temp_logger.log_activity(make_activity(conv="convA")) @@ -87,13 +94,16 @@ async def test_list_transcripts_empty_channel(temp_logger: FileTranscriptStore): # get_transcript_activities # ---------------------------- + @pytest.mark.asyncio -async def test_get_transcript_activities_reads_logged_items(temp_logger: FileTranscriptStore): +async def test_get_transcript_activities_reads_logged_items( + temp_logger: FileTranscriptStore, +): for i in range(3): await temp_logger.log_activity(make_activity(conv="convX", text=f"msg{i}")) result = await temp_logger.get_transcript_activities("testChannel", "convX") - texts = [a["text"] for a in result.items] + texts = [a.text for a in result.items] assert texts == ["msg0", "msg1", "msg2"] assert result.continuation_token is None @@ -104,33 +114,42 @@ async def test_get_transcript_activities_with_paging(temp_logger: FileTranscript for i in range(50): await temp_logger.log_activity(make_activity(conv="paged", text=f"msg{i}")) - first = await temp_logger.get_transcript_activities("testChannel", "paged", page_bytes=300) + first = await temp_logger.get_transcript_activities( + "testChannel", "paged", page_bytes=300 + ) assert len(first.items) > 0 assert first.continuation_token is not None second = await temp_logger.get_transcript_activities( - "testChannel", "paged", continuation_token=first.continuation_token, page_bytes=300 + "testChannel", + "paged", + continuation_token=first.continuation_token, + page_bytes=300, ) assert len(second.items) > 0 - assert all("msg" in a["text"] for a in second.items) + assert all("msg" in a.text for a in second.items) @pytest.mark.asyncio -async def test_get_transcript_activities_with_start_date_filter(temp_logger: FileTranscriptStore): +async def test_get_transcript_activities_with_start_date_filter( + temp_logger: FileTranscriptStore, +): old_ts = (datetime.now(timezone.utc) - timedelta(days=2)).isoformat() new_ts = datetime.now(timezone.utc).isoformat() activity1 = make_activity(conv="filtered", text="old") activity2 = make_activity(conv="filtered", text="new") - activity1["timestamp"] = old_ts - activity2["timestamp"] = new_ts + activity1.timestamp = old_ts + activity2.timestamp = new_ts await temp_logger.log_activity(activity1) await temp_logger.log_activity(activity2) start_date = datetime.now(timezone.utc) - timedelta(days=1) - result = await temp_logger.get_transcript_activities("testChannel", "filtered", start_date=start_date) - texts = [a["text"] for a in result.items] + result = await temp_logger.get_transcript_activities( + "testChannel", "filtered", start_date=start_date + ) + texts = [a.text for a in result.items] assert texts == ["new"] @@ -138,6 +157,7 @@ async def test_get_transcript_activities_with_start_date_filter(temp_logger: Fil # delete_transcript # ---------------------------- + @pytest.mark.asyncio async def test_delete_transcript_removes_file(temp_logger: FileTranscriptStore): await temp_logger.log_activity(make_activity(conv="todelete")) From 39e13d1c3458578764693ba3a6d84aaa8172b44e Mon Sep 17 00:00:00 2001 From: Chris Mullins Date: Mon, 6 Oct 2025 16:26:10 -0700 Subject: [PATCH 4/4] Enhance documentation for log_activity and list_transcripts methods in FileTranscriptStore --- .../core/storage/transcript_file_store.py | 42 ++++++------------- 1 file changed, 13 insertions(+), 29 deletions(-) diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_file_store.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_file_store.py index 4429966b..7f9d37ad 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_file_store.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/storage/transcript_file_store.py @@ -52,15 +52,12 @@ def __init__(self, root_folder: Union[str, Path]) -> None: async def log_activity(self, activity: Activity) -> None: """ - Append a single Activity to its conversation transcript. - - Parameters - ---------- - activity : Activity - An activity object (from microsoft_agents.activity.Activity or a dict with similar shape) - Must contain: - - channel_id (str) -> activity["channel_id"] or activity.channel_id - - conversation.id (str) -> activity["conversation"]["id"] or activity.conversation.id + Asynchronously persist a transcript activity to the file system. + This method computes the transcript file path based on the activity’s channel + and conversation identifiers, ensures the directory exists, and appends the + activity data to the transcript file in JSON format using a background thread. + If the activity lacks a timestamp, one is assigned prior to serialization. + :param activity: The activity to log. """ if not activity: raise ValueError("Activity is required") @@ -87,9 +84,7 @@ def _write() -> None: async def list_transcripts(self, channel_id: str) -> PagedResult[TranscriptInfo]: """ List transcripts (conversations) for a channel. - - Returns TranscriptInfo for each `.transcript` found. - """ + :param channel_id: The channel ID to list transcripts for.""" channel_dir = self._channel_dir(channel_id) def _list() -> List[TranscriptInfo]: @@ -123,23 +118,12 @@ async def get_transcript_activities( ) -> PagedResult[Activity]: """ Read activities from the transcript file (paged by byte size). - - Parameters - ---------- - continuation_token : str, optional - An opaque token from a previous call. In this implementation it's the - byte offset (str(int)). Omit or pass None to start at the beginning. - start_date : datetime, optional - If provided, only activities with `timestamp >= start_date` are returned. - (Compares the 'timestamp' property when present; otherwise includes the line.) - page_bytes : int - Max bytes to read from file on this call (not a hard activity count limit). - - Returns - ------- - PagedResult with: - - items: List[Activity-like dicts] - - continuation_token: str offset for next page, or None if EOF + :param channel_id: The channel ID of the conversation. + :param conversation_id: The conversation ID to read activities from. + :param continuation_token: Optional continuation token (byte offset as string). + :param start_date: Optional filter to only include activities on or after this date. + :param page_bytes: Maximum number of bytes to read (default: 512kB). + :return: A PagedResult containing a list of Activities and an optional continuation token. """ file_path = self._file_path(channel_id, conversation_id)