diff --git a/README.md b/README.md index d6ddd711..625d94e7 100644 --- a/README.md +++ b/README.md @@ -1459,6 +1459,29 @@ You can choose to use local services (Wyoming/Ollama) or OpenAI services by sett │ --port INTEGER Port to bind to │ │ [default: 8100] │ ╰──────────────────────────────────────────────────────────────────────────────╯ +╭─ Long Conversation Mode ─────────────────────────────────────────────────────╮ +│ --long-conversation --no-long-conversa… Enable long │ +│ conversation mode │ +│ with asymmetric │ +│ compression. │ +│ [default: │ +│ no-long-conversatio… │ +│ --context-budget INTEGER Target context │ +│ window size in │ +│ tokens │ +│ (long-conversation │ +│ mode). │ +│ [default: 150000] │ +│ --compress-threshold FLOAT Start compression │ +│ when context reaches │ +│ this fraction of │ +│ budget. │ +│ [default: 0.8] │ +│ --raw-recent-tokens INTEGER Always keep this │ +│ many recent tokens │ +│ uncompressed. │ +│ [default: 40000] │ +╰──────────────────────────────────────────────────────────────────────────────╯ ╭─ General Options ────────────────────────────────────────────────────────────╮ │ --log-level TEXT Set logging level. │ │ [default: WARNING] │ diff --git a/agent_cli/agents/memory_proxy.py b/agent_cli/agents/memory_proxy.py index 247bcc0f..0e3aea8f 100644 --- a/agent_cli/agents/memory_proxy.py +++ b/agent_cli/agents/memory_proxy.py @@ -66,6 +66,31 @@ def memory_proxy( help="Enable automatic git commit of memory changes.", rich_help_panel="Memory Configuration", ), + # Long conversation mode options + long_conversation: bool = typer.Option( + False, # noqa: FBT003 + "--long-conversation/--no-long-conversation", + help="Enable long conversation mode with asymmetric compression.", + rich_help_panel="Long Conversation Mode", + ), + context_budget: int = typer.Option( + 150_000, + "--context-budget", + help="Target context window size in tokens (long-conversation mode).", + rich_help_panel="Long Conversation Mode", + ), + compress_threshold: float = typer.Option( + 0.8, + "--compress-threshold", + help="Start compression when context reaches this fraction of budget.", + rich_help_panel="Long Conversation Mode", + ), + raw_recent_tokens: int = typer.Option( + 40_000, + "--raw-recent-tokens", + help="Always keep this many recent tokens uncompressed.", + rich_help_panel="Long Conversation Mode", + ), log_level: str = opts.LOG_LEVEL, config_file: str | None = opts.CONFIG_FILE, print_args: bool = opts.PRINT_ARGS, @@ -145,6 +170,13 @@ def memory_proxy( console.print(" ⚙️ Summaries: [red]disabled[/red]") if git_versioning: console.print(" 📝 Git Versioning: [green]enabled[/green]") + if long_conversation: + console.print(" 📜 Long Conversation Mode: [green]enabled[/green]") + console.print( + f" Context budget: [blue]{context_budget:,}[/blue] tokens, " + f"compress at [blue]{compress_threshold:.0%}[/blue], " + f"keep [blue]{raw_recent_tokens:,}[/blue] raw", + ) fastapi_app = create_app( memory_path, @@ -159,6 +191,11 @@ def memory_proxy( recency_weight=recency_weight, score_threshold=score_threshold, enable_git_versioning=git_versioning, + # Long conversation mode settings + long_conversation=long_conversation, + context_budget=context_budget, + compress_threshold=compress_threshold, + raw_recent_tokens=raw_recent_tokens, ) uvicorn.run(fastapi_app, host=host, port=port, log_config=None) diff --git a/agent_cli/memory/_long_conversation.py b/agent_cli/memory/_long_conversation.py new file mode 100644 index 00000000..8e8db1b3 --- /dev/null +++ b/agent_cli/memory/_long_conversation.py @@ -0,0 +1,974 @@ +"""Long conversation mode: chronological context with asymmetric compression.""" + +from __future__ import annotations + +import difflib +import hashlib +import json +import logging +import re +from dataclasses import dataclass +from datetime import UTC, datetime +from typing import TYPE_CHECKING, Any +from uuid import uuid4 + +import yaml + +from agent_cli.core.utils import atomic_write_text +from agent_cli.memory.entities import LongConversation, Segment + +if TYPE_CHECKING: + from pathlib import Path + +LOGGER = logging.getLogger(__name__) + +_LONG_CONVO_DIR = "long_conversations" +_SEGMENTS_DIR = "segments" +_METADATA_FILE = "metadata.json" +_FRONTMATTER_PARTS = 3 # Number of parts when splitting "---\nyaml\n---\ncontent" + +# Asymmetric compression configuration +COMPRESSION_CONFIG = { + "user": { + "recent_turns": 20, # Keep last N user turns raw + "summary_target_ratio": 0.7, # Compress to 70% (gentle) + "preserve_quotes": True, # Keep exact user phrasing + "preserve_code": True, # Never summarize code blocks + }, + "assistant": { + "recent_turns": 10, # Keep last N assistant turns raw + "summary_target_ratio": 0.2, # Compress to 20% (aggressive) + "keep_decisions": True, # Preserve: "I decided to...", "I'll use..." + "keep_conclusions": True, # Preserve final answers + }, +} + +# Summarization prompts for asymmetric compression +_USER_SUMMARIZE_PROMPT = """Summarize the following user message concisely while: +- Preserving ALL code blocks exactly as-is (do not modify or summarize code) +- Preserving direct quotes and specific requests +- Keeping technical details and requirements +- Maintaining the user's intent + +Target length: approximately {target_ratio:.0%} of original. + +User message: +{content} + +Summary:""" + +_ASSISTANT_SUMMARIZE_PROMPT = """Summarize the following assistant response aggressively to bullet points: +- Keep only key decisions ("I decided to...", "I'll use...") +- Keep only final conclusions and answers +- Remove explanations, elaborations, and filler +- Preserve any code that was provided + +Target length: approximately {target_ratio:.0%} of original. + +Assistant response: +{content} + +Summary:""" + + +def estimate_tokens(text: str) -> int: + """Estimate token count using ~4 chars per token heuristic.""" + return len(text) // 4 + + +def content_hash(content: str) -> str: + """Generate a hash for content deduplication.""" + return hashlib.sha256(content.encode()).hexdigest()[:16] + + +def _ensure_conversation_dir(memory_root: Path, conversation_id: str) -> Path: + """Ensure conversation directory exists and return path.""" + conv_dir = memory_root / _LONG_CONVO_DIR / conversation_id + conv_dir.mkdir(parents=True, exist_ok=True) + (conv_dir / _SEGMENTS_DIR).mkdir(exist_ok=True) + return conv_dir + + +def _segment_filename(segment: Segment, index: int) -> str: + """Generate filename for a segment.""" + ts = segment.timestamp.strftime("%Y%m%d-%H%M%S") + return f"{index:06d}_{segment.role}_{ts}.md" + + +def _render_segment_file(segment: Segment) -> str: + """Render segment as markdown with YAML frontmatter.""" + metadata = { + "id": segment.id, + "role": segment.role, + "timestamp": segment.timestamp.isoformat(), + "original_tokens": segment.original_tokens, + "current_tokens": segment.current_tokens, + "state": segment.state, + "content_hash": segment.content_hash, + } + if segment.summary: + metadata["summary"] = segment.summary + if segment.refers_to: + metadata["refers_to"] = segment.refers_to + if segment.diff: + metadata["diff"] = segment.diff + + front_matter = yaml.safe_dump(metadata, default_flow_style=False, allow_unicode=True) + return f"---\n{front_matter}---\n\n{segment.content}\n" + + +def _parse_segment_file(path: Path) -> Segment | None: + """Parse a segment markdown file.""" + try: + text = path.read_text(encoding="utf-8") + except OSError: + LOGGER.warning("Failed to read segment file: %s", path) + return None + + if not text.startswith("---"): + return None + + parts = text.split("---", 2) + if len(parts) < _FRONTMATTER_PARTS: + return None + + try: + metadata = yaml.safe_load(parts[1]) + content = parts[2].strip() + except yaml.YAMLError: + LOGGER.warning("Failed to parse segment frontmatter: %s", path) + return None + + return Segment( + id=metadata["id"], + role=metadata["role"], + content=content, + timestamp=datetime.fromisoformat(metadata["timestamp"]), + original_tokens=metadata["original_tokens"], + current_tokens=metadata["current_tokens"], + state=metadata.get("state", "raw"), + summary=metadata.get("summary"), + refers_to=metadata.get("refers_to"), + diff=metadata.get("diff"), + content_hash=metadata.get("content_hash", ""), + ) + + +def save_segment( + memory_root: Path, + conversation_id: str, + segment: Segment, + index: int, +) -> Path: + """Save a segment to disk.""" + conv_dir = _ensure_conversation_dir(memory_root, conversation_id) + segments_dir = conv_dir / _SEGMENTS_DIR + filename = _segment_filename(segment, index) + path = segments_dir / filename + content = _render_segment_file(segment) + atomic_write_text(path, content) + LOGGER.debug("Saved segment %s to %s", segment.id, path) + return path + + +def load_segments(memory_root: Path, conversation_id: str) -> list[Segment]: + """Load all segments for a conversation, sorted by filename (chronological).""" + conv_dir = memory_root / _LONG_CONVO_DIR / conversation_id / _SEGMENTS_DIR + if not conv_dir.exists(): + return [] + + segments = [] + for path in sorted(conv_dir.glob("*.md")): + segment = _parse_segment_file(path) + if segment: + segments.append(segment) + + return segments + + +def save_conversation_metadata( + memory_root: Path, + conversation: LongConversation, +) -> None: + """Save conversation metadata (excluding segment contents).""" + conv_dir = _ensure_conversation_dir(memory_root, conversation.id) + metadata = { + "id": conversation.id, + "target_context_tokens": conversation.target_context_tokens, + "current_total_tokens": conversation.current_total_tokens, + "compress_threshold": conversation.compress_threshold, + "raw_recent_tokens": conversation.raw_recent_tokens, + "segment_count": len(conversation.segments), + } + path = conv_dir / _METADATA_FILE + atomic_write_text(path, json.dumps(metadata, indent=2)) + + +def load_conversation( + memory_root: Path, + conversation_id: str, + *, + target_context_tokens: int = 150_000, + compress_threshold: float = 0.8, + raw_recent_tokens: int = 40_000, +) -> LongConversation: + """Load a conversation from disk, or create new if not exists.""" + conv_dir = memory_root / _LONG_CONVO_DIR / conversation_id + metadata_path = conv_dir / _METADATA_FILE + + # Load metadata if exists + if metadata_path.exists(): + try: + metadata = json.loads(metadata_path.read_text()) + target_context_tokens = metadata.get( + "target_context_tokens", + target_context_tokens, + ) + compress_threshold = metadata.get("compress_threshold", compress_threshold) + raw_recent_tokens = metadata.get("raw_recent_tokens", raw_recent_tokens) + except (OSError, json.JSONDecodeError): + LOGGER.warning("Failed to load conversation metadata, using defaults") + + # Load segments + segments = load_segments(memory_root, conversation_id) + + # Calculate total tokens + total_tokens = sum(s.current_tokens for s in segments) + + return LongConversation( + id=conversation_id, + segments=segments, + target_context_tokens=target_context_tokens, + current_total_tokens=total_tokens, + compress_threshold=compress_threshold, + raw_recent_tokens=raw_recent_tokens, + ) + + +def create_segment( + role: str, + content: str, + *, + state: str = "raw", +) -> Segment: + """Create a new segment from content.""" + tokens = estimate_tokens(content) + return Segment( + id=str(uuid4()), + role=role, # type: ignore[arg-type] + content=content, + timestamp=datetime.now(UTC), + original_tokens=tokens, + current_tokens=tokens, + state=state, # type: ignore[arg-type] + content_hash=content_hash(content), + ) + + +def append_segment( + memory_root: Path, + conversation: LongConversation, + segment: Segment, +) -> LongConversation: + """Append a segment to the conversation and persist.""" + # Add to conversation + conversation.segments.append(segment) + conversation.current_total_tokens += segment.current_tokens + + # Persist segment + index = len(conversation.segments) + save_segment(memory_root, conversation.id, segment, index) + + # Update metadata + save_conversation_metadata(memory_root, conversation) + + return conversation + + +def get_recent_segments( + conversation: LongConversation, + max_tokens: int | None = None, +) -> list[Segment]: + """Get recent segments up to max_tokens (newest last).""" + if max_tokens is None: + max_tokens = conversation.raw_recent_tokens + + result: list[Segment] = [] + token_count = 0 + + # Iterate from newest to oldest + for segment in reversed(conversation.segments): + if token_count + segment.current_tokens > max_tokens: + break + result.append(segment) + token_count += segment.current_tokens + + # Reverse to maintain chronological order + return list(reversed(result)) + + +def get_older_segments( + conversation: LongConversation, + recent_count: int, +) -> list[Segment]: + """Get segments older than the recent window.""" + if recent_count >= len(conversation.segments): + return [] + return conversation.segments[:-recent_count] + + +def build_context( + conversation: LongConversation, + new_message: str, + token_budget: int, + system_prompt: str | None = None, +) -> list[dict[str, str]]: + """Build context for LLM request, enforcing token budget. + + For Phase 1: No compression, just trim oldest messages if needed. + """ + messages: list[dict[str, str]] = [] + + # 1. System message (required) + if system_prompt: + system_msg = {"role": "system", "content": system_prompt} + messages.append(system_msg) + reserved_tokens = estimate_tokens(system_prompt) + else: + reserved_tokens = 0 + + # 2. New user message (required, reserve space) + new_user_msg = {"role": "user", "content": new_message} + reserved_tokens += estimate_tokens(new_message) + + # 3. Calculate available budget for history + available = token_budget - reserved_tokens + + # 4. Get recent segments that fit in budget + recent_segments = get_recent_segments(conversation, max_tokens=available) + + # 5. Add history with appropriate content based on state + for seg in recent_segments: + if seg.state == "summarized" and seg.summary: + # Use summary for summarized segments + content = seg.summary + elif seg.state == "reference": + # Reference segments already have compact format with diff + content = seg.content + else: + # Raw segments use full content + content = seg.content + messages.append({"role": seg.role, "content": content}) + + # 6. Add new user message + messages.append(new_user_msg) + + return messages + + +def should_compress(conversation: LongConversation) -> bool: + """Check if compression is needed.""" + usage_ratio = conversation.current_total_tokens / conversation.target_context_tokens + return usage_ratio >= conversation.compress_threshold + + +def _count_recent_by_role(segments: list[Segment], role: str) -> int: + """Count how many of the most recent segments match a role.""" + count = 0 + for seg in reversed(segments): + if seg.role == role: + count += 1 + # Only count contiguous recent segments + if count >= COMPRESSION_CONFIG.get(role, {}).get("recent_turns", 10): + break + return count + + +def _is_recent_segment( + segment: Segment, + conversation: LongConversation, + segment_index: int, +) -> bool: + """Check if a segment is within the protected recent window.""" + # Check token-based recent window + token_count = 0 + for i in range(len(conversation.segments) - 1, segment_index - 1, -1): + token_count += conversation.segments[i].current_tokens + if token_count > conversation.raw_recent_tokens: + return segment_index > i + + # If we're within token budget, also check turn-based recent window + role_config = COMPRESSION_CONFIG.get(segment.role, {}) + recent_turns = role_config.get("recent_turns", 10) + + # Count how many segments of this role are after this one + turns_after = sum( + 1 for seg in conversation.segments[segment_index + 1 :] if seg.role == segment.role + ) + return turns_after < recent_turns + + +def select_segments_to_compress( + conversation: LongConversation, + target_reduction: int | None = None, +) -> list[Segment]: + """Select segments for compression, prioritizing assistant messages. + + Args: + conversation: The conversation to analyze + target_reduction: Target number of tokens to free up (optional) + + Returns: + List of segments to compress, ordered by compression priority + (assistant messages first, then by age - oldest first). + + """ + candidates: list[tuple[int, Segment]] = [] + + for i, seg in enumerate(conversation.segments): + # Skip already compressed segments + if seg.state != "raw": + continue + # Skip system messages + if seg.role == "system": + continue + # Skip recent segments + if _is_recent_segment(seg, conversation, i): + continue + candidates.append((i, seg)) + + # Sort: assistant messages first (they get compressed more aggressively), + # then by timestamp (oldest first) + candidates.sort(key=lambda x: (x[1].role == "user", x[1].timestamp)) + + # If target_reduction specified, limit to segments needed + if target_reduction is not None: + selected = [] + potential_savings = 0 + for _i, seg in candidates: + role_config = COMPRESSION_CONFIG.get(seg.role, {}) + target_ratio = role_config.get("summary_target_ratio", 0.5) + savings = int(seg.current_tokens * (1 - target_ratio)) + selected.append(seg) + potential_savings += savings + if potential_savings >= target_reduction: + break + return selected + + return [seg for _i, seg in candidates] + + +# --- Compression --- + + +async def summarize_segment( + segment: Segment, + openai_base_url: str, + model: str, + api_key: str | None = None, +) -> str: + """Summarize a segment using LLM with role-appropriate prompts. + + Uses asymmetric compression: + - User messages: gentle summarization, preserve code and quotes + - Assistant messages: aggressive summarization to bullet points + """ + from agent_cli.core.openai_proxy import forward_chat_request # noqa: PLC0415 + from agent_cli.memory.models import ChatRequest, Message # noqa: PLC0415 + + role_config = COMPRESSION_CONFIG.get(segment.role, COMPRESSION_CONFIG["assistant"]) + target_ratio = role_config.get("summary_target_ratio", 0.5) + + # Select the appropriate prompt template + if segment.role == "user": + prompt = _USER_SUMMARIZE_PROMPT.format( + target_ratio=target_ratio, + content=segment.content, + ) + else: + prompt = _ASSISTANT_SUMMARIZE_PROMPT.format( + target_ratio=target_ratio, + content=segment.content, + ) + + # Create summarization request + request = ChatRequest( + messages=[Message(role="user", content=prompt)], + model=model, + stream=False, + ) + + response = await forward_chat_request( + request, + openai_base_url, + api_key, + exclude_fields=set(), + ) + + if not isinstance(response, dict): + LOGGER.warning("Unexpected response type during summarization: %s", type(response)) + return segment.content # Return original on failure + + summary = _extract_assistant_content(response) + if not summary: + LOGGER.warning("Failed to extract summary from response") + return segment.content # Return original on failure + + return summary + + +async def compress_segment( + memory_root: Path, + conversation: LongConversation, + segment: Segment, + openai_base_url: str, + model: str, + api_key: str | None = None, +) -> Segment: + """Compress a single segment and update it on disk. + + Returns the updated segment with summary and new token count. + """ + LOGGER.info( + "Compressing segment %s (role=%s, tokens=%d)", + segment.id, + segment.role, + segment.current_tokens, + ) + + # Get summary from LLM + summary = await summarize_segment(segment, openai_base_url, model, api_key) + new_tokens = estimate_tokens(summary) + + # Update segment + segment.summary = summary + segment.current_tokens = new_tokens + segment.state = "summarized" + + # Find segment index and save to disk + for i, seg in enumerate(conversation.segments): + if seg.id == segment.id: + save_segment(memory_root, conversation.id, segment, i + 1) + break + + LOGGER.info( + "Compressed segment %s: %d → %d tokens (%.0f%% reduction)", + segment.id, + segment.original_tokens, + new_tokens, + (1 - new_tokens / segment.original_tokens) * 100 if segment.original_tokens > 0 else 0, + ) + + return segment + + +async def compress_conversation( + memory_root: Path, + conversation: LongConversation, + openai_base_url: str, + model: str, + api_key: str | None = None, +) -> LongConversation: + """Compress segments until under the threshold. + + Selects segments to compress prioritizing: + 1. Assistant messages (more aggressive compression) + 2. Older messages first + """ + if not should_compress(conversation): + return conversation + + # Calculate how many tokens we need to free + target_tokens = int(conversation.target_context_tokens * conversation.compress_threshold * 0.9) + tokens_to_free = conversation.current_total_tokens - target_tokens + + LOGGER.info( + "Conversation %s needs compression: %d tokens, target %d, need to free %d", + conversation.id, + conversation.current_total_tokens, + target_tokens, + tokens_to_free, + ) + + # Select segments to compress + segments_to_compress = select_segments_to_compress(conversation, tokens_to_free) + + if not segments_to_compress: + LOGGER.warning("No segments available for compression") + return conversation + + # Compress selected segments + total_saved = 0 + for segment in segments_to_compress: + old_tokens = segment.current_tokens + await compress_segment( + memory_root, + conversation, + segment, + openai_base_url, + model, + api_key, + ) + saved = old_tokens - segment.current_tokens + total_saved += saved + conversation.current_total_tokens -= saved + + # Stop if we've freed enough tokens + if total_saved >= tokens_to_free: + break + + # Update metadata + save_conversation_metadata(memory_root, conversation) + + LOGGER.info( + "Compression complete: freed %d tokens, now at %d (%.1f%% of target)", + total_saved, + conversation.current_total_tokens, + (conversation.current_total_tokens / conversation.target_context_tokens) * 100, + ) + + return conversation + + +# --- Repetition Detection --- + +# Minimum characters for a chunk to be considered for deduplication +_MIN_CHUNK_CHARS = 200 + +# Minimum similarity ratio to consider chunks as near-duplicates +_SIMILARITY_THRESHOLD = 0.85 + +# Only store diff if it's smaller than this fraction of original +_DIFF_SIZE_THRESHOLD = 0.3 + + +@dataclass +class TextChunk: + """A chunk of text extracted from content.""" + + content: str + start: int # Start position in original content + end: int # End position in original content + + +@dataclass +class RepetitionResult: + """Result of repetition detection.""" + + original_segment_id: str + original_chunk: str + new_chunk: TextChunk + diff: str + saved_tokens: int + + +def extract_chunks(content: str, min_chars: int = _MIN_CHUNK_CHARS) -> list[TextChunk]: + """Extract substantial text chunks from content. + + Splits by double newlines (paragraphs) and filters by minimum size. + This catches code blocks, error messages, logs, or any repeated text. + """ + chunks = [] + pos = 0 + + for match in re.finditer(r"(.*?)(?:\n\n+|$)", content, re.DOTALL): + chunk_text = match.group(1).strip() + if len(chunk_text) >= min_chars: + # Find actual position in original content + start = content.find(chunk_text, pos) + if start != -1: + chunks.append( + TextChunk( + content=chunk_text, + start=start, + end=start + len(chunk_text), + ), + ) + pos = start + len(chunk_text) + + return chunks + + +def compute_similarity(text1: str, text2: str) -> float: + """Compute similarity ratio between two text chunks using difflib.""" + return difflib.SequenceMatcher(None, text1, text2).ratio() + + +def compute_unified_diff(original: str, modified: str) -> str: + """Compute unified diff between two text chunks.""" + original_lines = original.splitlines(keepends=True) + modified_lines = modified.splitlines(keepends=True) + + diff = difflib.unified_diff( + original_lines, + modified_lines, + fromfile="original", + tofile="modified", + lineterm="", + ) + return "".join(diff) + + +def detect_repetition( + new_content: str, + history: list[Segment], +) -> RepetitionResult | None: + """Find near-duplicate text chunks in new content compared to history. + + Works on any substantial text: code, error messages, logs, etc. + Returns the first significant repetition found, or None if no duplicates. + """ + new_chunks = extract_chunks(new_content) + if not new_chunks: + return None + + for chunk in new_chunks: + for seg in history: + # Only check user messages (they contain user-pasted content) + if seg.role != "user": + continue + + for hist_chunk in extract_chunks(seg.content): + similarity = compute_similarity(chunk.content, hist_chunk.content) + + if similarity >= _SIMILARITY_THRESHOLD: + diff = compute_unified_diff(hist_chunk.content, chunk.content) + diff_size = len(diff) + original_size = len(chunk.content) + + # Only use diff if it's significantly smaller + if diff_size < original_size * _DIFF_SIZE_THRESHOLD: + saved_tokens = estimate_tokens(chunk.content) - estimate_tokens( + diff, + ) + return RepetitionResult( + original_segment_id=seg.id, + original_chunk=hist_chunk.content, + new_chunk=chunk, + diff=diff, + saved_tokens=saved_tokens, + ) + + return None + + +def create_reference_content( + original_content: str, + repetition: RepetitionResult, +) -> str: + """Replace repeated chunk with a reference + diff. + + Finds the matching chunk and replaces it with a compact reference. + """ + chunk = repetition.new_chunk + + # Build replacement text + diff_marker = "" + if repetition.diff.strip(): + diff_marker = f"\n\n[Changes:\n{repetition.diff}]" + + replacement = f"[Similar to segment {repetition.original_segment_id}]{diff_marker}" + + # Replace the chunk with the reference + return original_content[: chunk.start] + replacement + original_content[chunk.end :] + + +def create_segment_with_dedup( + role: str, + content: str, + history: list[Segment], + *, + state: str = "raw", +) -> Segment: + """Create a new segment, detecting and deduplicating repeated text. + + If significant repetition is found, stores a reference instead of + the full content. + """ + # Only check user messages for repetition + repetition = None + deduped_content = content + refers_to = None + diff = None + + if role == "user" and history: + repetition = detect_repetition(content, history) + if repetition: + deduped_content = create_reference_content(content, repetition) + refers_to = repetition.original_segment_id + diff = repetition.diff + LOGGER.info( + "Detected repetition (segment %s), saved %d tokens", + refers_to, + repetition.saved_tokens, + ) + state = "reference" + + tokens = estimate_tokens(deduped_content) + return Segment( + id=str(uuid4()), + role=role, # type: ignore[arg-type] + content=deduped_content, + timestamp=datetime.now(UTC), + original_tokens=estimate_tokens(content), # Original size + current_tokens=tokens, # After dedup + state=state, # type: ignore[arg-type] + refers_to=refers_to, + diff=diff, + content_hash=content_hash(content), + ) + + +def reconstruct_from_reference( + segment: Segment, + all_segments: list[Segment], +) -> str: + """Reconstruct full content from a reference segment. + + For now, returns the stored content with reference marker. + The LLM can understand the reference + diff format. + """ + if segment.state != "reference" or not segment.refers_to: + return segment.content + + # Find the original segment (for potential future reconstruction) + original_seg = None + for seg in all_segments: + if seg.id == segment.refers_to: + original_seg = seg + break + + if not original_seg: + LOGGER.warning( + "Could not find original segment %s for reference", + segment.refers_to, + ) + + # Return stored content - LLM understands reference format + return segment.content + + +# --- Chat Processing --- + + +def _extract_assistant_content(response: dict[str, Any]) -> str | None: + """Extract assistant content from a chat completion response.""" + choices = response.get("choices", []) + if not choices: + return None + message = choices[0].get("message") + if not message: + return None + return message.get("content") + + +async def process_long_conversation_chat( + memory_root: Path, + conversation_id: str, + messages: list[dict[str, str]], + model: str, + openai_base_url: str, + api_key: str | None = None, + *, + stream: bool = False, + target_context_tokens: int = 150_000, + compress_threshold: float = 0.8, + raw_recent_tokens: int = 40_000, +) -> Any: + """Process a chat request in long conversation mode. + + Maintains chronological context with asymmetric compression: + - User messages compressed gently (preserve code, quotes) + - Assistant messages compressed aggressively (bullet points) + """ + from agent_cli.core.openai_proxy import forward_chat_request # noqa: PLC0415 + from agent_cli.memory.models import ChatRequest, Message # noqa: PLC0415 + + LOGGER.info("Long conversation chat: conversation=%s, model=%s", conversation_id, model) + + # Load or create conversation + conversation = load_conversation( + memory_root, + conversation_id, + target_context_tokens=target_context_tokens, + compress_threshold=compress_threshold, + raw_recent_tokens=raw_recent_tokens, + ) + + # Extract new user message from request + user_message = None + for msg in reversed(messages): + if msg.get("role") == "user": + user_message = msg.get("content", "") + break + + if not user_message: + LOGGER.warning("No user message found in request") + # Fall through with empty message + + # Build context using conversation history + context_messages = build_context( + conversation, + new_message=user_message or "", + token_budget=target_context_tokens, + system_prompt=None, # Could extract from messages if present + ) + + # Create augmented request + aug_messages = [Message(role=m["role"], content=m["content"]) for m in context_messages] + aug_request = ChatRequest( + messages=aug_messages, + model=model, + stream=stream, + ) + + # Streaming support will be added in Phase 2 + if stream: + LOGGER.warning( + "Streaming not yet supported in long conversation mode, falling back to non-streaming", + ) + aug_request.stream = False + + # Forward to LLM + response = await forward_chat_request( + aug_request, + openai_base_url, + api_key, + exclude_fields=set(), + ) + + if not isinstance(response, dict): + return response + + # Append user message as segment (with deduplication) + if user_message: + user_segment = create_segment_with_dedup( + "user", + user_message, + conversation.segments, + ) + append_segment(memory_root, conversation, user_segment) + + # Append assistant response as segment (no dedup for assistant) + assistant_content = _extract_assistant_content(response) + if assistant_content: + assistant_segment = create_segment("assistant", assistant_content) + append_segment(memory_root, conversation, assistant_segment) + + # Compress if needed (runs in background after returning response) + if should_compress(conversation): + LOGGER.info( + "Conversation %s exceeds compression threshold (%.1f%% of %d tokens), compressing...", + conversation_id, + (conversation.current_total_tokens / conversation.target_context_tokens) * 100, + conversation.target_context_tokens, + ) + await compress_conversation( + memory_root, + conversation, + openai_base_url, + model, + api_key, + ) + + return response diff --git a/agent_cli/memory/api.py b/agent_cli/memory/api.py index 0c457202..0b157069 100644 --- a/agent_cli/memory/api.py +++ b/agent_cli/memory/api.py @@ -32,26 +32,13 @@ def create_app( recency_weight: float = 0.2, score_threshold: float = 0.35, enable_git_versioning: bool = True, + # Long conversation mode settings + long_conversation: bool = False, + context_budget: int = 150_000, + compress_threshold: float = 0.8, + raw_recent_tokens: int = 40_000, ) -> FastAPI: """Create the FastAPI app for memory-backed chat.""" - LOGGER.info("Initializing memory client...") - - client = MemoryClient( - memory_path=memory_path, - openai_base_url=openai_base_url, - embedding_model=embedding_model, - embedding_api_key=embedding_api_key, - chat_api_key=chat_api_key, - default_top_k=default_top_k, - enable_summarization=enable_summarization, - max_entries=max_entries, - mmr_lambda=mmr_lambda, - recency_weight=recency_weight, - score_threshold=score_threshold, - start_watcher=False, # We control start/stop via app events - enable_git_versioning=enable_git_versioning, - ) - app = FastAPI(title="Memory Proxy") app.add_middleware( @@ -62,6 +49,39 @@ def create_app( allow_headers=["*"], ) + # Store config in app state for access in endpoints + app.state.memory_path = memory_path + app.state.openai_base_url = openai_base_url + app.state.chat_api_key = chat_api_key + app.state.long_conversation = long_conversation + app.state.context_budget = context_budget + app.state.compress_threshold = compress_threshold + app.state.raw_recent_tokens = raw_recent_tokens + + # Only initialize MemoryClient for standard mode + client: MemoryClient | None = None + if not long_conversation: + LOGGER.info("Initializing memory client (standard mode)...") + client = MemoryClient( + memory_path=memory_path, + openai_base_url=openai_base_url, + embedding_model=embedding_model, + embedding_api_key=embedding_api_key, + chat_api_key=chat_api_key, + default_top_k=default_top_k, + enable_summarization=enable_summarization, + max_entries=max_entries, + mmr_lambda=mmr_lambda, + recency_weight=recency_weight, + score_threshold=score_threshold, + start_watcher=False, + enable_git_versioning=enable_git_versioning, + ) + app.state.client = client + else: + LOGGER.info("Long conversation mode enabled, skipping standard memory client") + app.state.client = None + @app.post("/v1/chat/completions") async def chat_completions(request: Request, chat_request: ChatRequest) -> Any: auth_header = request.headers.get("Authorization") @@ -69,7 +89,28 @@ async def chat_completions(request: Request, chat_request: ChatRequest) -> Any: if auth_header and auth_header.startswith("Bearer "): api_key = auth_header.split(" ")[1] - return await client.chat( + if app.state.long_conversation: + # Long conversation mode + from agent_cli.memory._long_conversation import ( # noqa: PLC0415 + process_long_conversation_chat, + ) + + return await process_long_conversation_chat( + memory_root=app.state.memory_path, + conversation_id=chat_request.memory_id or "default", + messages=[m.model_dump() for m in chat_request.messages], + model=chat_request.model, + openai_base_url=app.state.openai_base_url, + api_key=api_key or app.state.chat_api_key, + stream=chat_request.stream or False, + target_context_tokens=app.state.context_budget, + compress_threshold=app.state.compress_threshold, + raw_recent_tokens=app.state.raw_recent_tokens, + ) + + # Standard memory mode + memory_client = app.state.client + return await memory_client.chat( messages=chat_request.messages, conversation_id=chat_request.memory_id or "default", model=chat_request.model, @@ -82,20 +123,25 @@ async def chat_completions(request: Request, chat_request: ChatRequest) -> Any: @app.on_event("startup") async def start_watch() -> None: - client.start() + if app.state.client: + app.state.client.start() @app.on_event("shutdown") async def stop_watch() -> None: - await client.stop() + if app.state.client: + await app.state.client.stop() @app.get("/health") def health() -> dict[str, str]: - return { + result = { "status": "ok", - "memory_store": str(client.memory_path), - "openai_base_url": client.openai_base_url, - "default_top_k": str(client.default_top_k), + "memory_store": str(app.state.memory_path), + "openai_base_url": app.state.openai_base_url, + "mode": "long_conversation" if app.state.long_conversation else "standard", } + if app.state.client: + result["default_top_k"] = str(app.state.client.default_top_k) + return result @app.api_route( "/{path:path}", @@ -106,8 +152,8 @@ async def proxy_catch_all(request: Request, path: str) -> Any: return await proxy_request_to_upstream( request, path, - client.openai_base_url, - client.chat_api_key, + app.state.openai_base_url, + app.state.chat_api_key, ) return app diff --git a/agent_cli/memory/entities.py b/agent_cli/memory/entities.py index 70b16a78..3f099b63 100644 --- a/agent_cli/memory/entities.py +++ b/agent_cli/memory/entities.py @@ -41,3 +41,56 @@ class Summary(BaseModel): content: str created_at: datetime # Summaries are role="summary" implicitly + + +# --- Long Conversation Mode Entities --- + + +class Segment(BaseModel): + """A single turn in a long conversation.""" + + id: str = Field(..., description="Unique UUID for this segment") + role: Literal["user", "assistant", "system"] + content: str + timestamp: datetime + + # Token accounting + original_tokens: int = Field(..., description="Token count of original content") + current_tokens: int = Field(..., description="Token count after compression") + + # Compression state + state: Literal["raw", "summarized", "reference"] = "raw" + + # For summarized segments + summary: str | None = Field(None, description="Summarized version of content") + + # For reference-type (deduplicated) segments + refers_to: str | None = Field(None, description="ID of original segment this references") + diff: str | None = Field(None, description="Diff from the referenced segment") + + # Content fingerprint for dedup + content_hash: str = "" + + +class LongConversation(BaseModel): + """Full conversation state for long conversation mode.""" + + id: str = Field(..., description="Unique conversation ID") + segments: list[Segment] = Field(default_factory=list) + + # Budget tracking + target_context_tokens: int = Field( + 150_000, + description="Target context window size (leave room for response)", + ) + current_total_tokens: int = Field(0, description="Current total tokens in conversation") + + # Compression thresholds + compress_threshold: float = Field( + 0.8, + description="Start compressing at this fraction of target", + ) + raw_recent_tokens: int = Field( + 40_000, + description="Always keep this many recent tokens raw", + ) diff --git a/docs/design-long-conversation-mode.md b/docs/design-long-conversation-mode.md new file mode 100644 index 00000000..10eb3232 --- /dev/null +++ b/docs/design-long-conversation-mode.md @@ -0,0 +1,438 @@ +# Long Conversation Mode: Design Document + +## Overview + +A new mode for `agent-cli memory proxy` that maintains a single, continuous conversation with intelligent compression, optimized for 100-200k token context windows. + +**Key insight**: User input is precious and hard to summarize without loss. LLM output is verbose and derivable. Compress asymmetrically. + +## Motivation + +Current memory system (`memory proxy`): +- Extracts discrete facts from conversations +- Stores in vector DB, retrieves by semantic similarity +- Treats user and LLM text equally for compression +- Optimized for: many short conversations, cross-conversation retrieval + +New mode targets a different use case: +- Single long-running conversation (days/weeks/months) +- LLM learns about user over time +- Full context utilization (100-200k tokens) +- Preserve user intent, compress LLM verbosity + +## Integration Point + +Add `--long-conversation` flag to `memory proxy`: + +```bash +# Current behavior (fact extraction + semantic retrieval) +agent-cli memory proxy --memory-path ./memory_db + +# New mode (continuous conversation with asymmetric compression) +agent-cli memory proxy --memory-path ./memory_db --long-conversation +``` + +Both modes share: +- Same FastAPI proxy infrastructure +- Same storage directory structure +- Same upstream LLM forwarding + +They differ in: +- How context is built for each request +- What gets compressed and when +- How repetition is handled + +## Architecture + +### Current Flow (`memory proxy`) + +``` +Request → Extract user query + → Semantic search in ChromaDB + → Retrieve top-k facts + summary + → Inject into prompt + → Forward to LLM + → Extract facts from response (background) + → Store new facts +``` + +### Long Conversation Flow + +``` +Request → Append to conversation buffer + → Check if compression needed (approaching context limit) + → If yes: compress older segments (asymmetrically) + → Detect repetition in new content + → Build full context: compressed history + raw recent + → Forward to LLM + → Append response to buffer +``` + +## Data Model + +### Conversation Segment + +```python +@dataclass +class Segment: + """A single turn in the conversation.""" + id: str + role: Literal["user", "assistant", "system"] + content: str + timestamp: datetime + + # Token accounting + original_tokens: int + current_tokens: int # After compression + + # Compression state + state: Literal["raw", "summarized", "reference"] + + # For reference-type (deduplicated) segments + refers_to: str | None = None # ID of original segment + diff: str | None = None # What changed from original + + # Content fingerprint for dedup + content_hash: str = "" +``` + +### Conversation Store + +```python +@dataclass +class LongConversation: + """Full conversation with compression metadata.""" + id: str + segments: list[Segment] + + # Budget tracking + target_context_tokens: int = 150_000 # Leave room for response + current_total_tokens: int = 0 + + # Compression thresholds + compress_threshold: float = 0.8 # Start compressing at 80% of target + raw_recent_tokens: int = 40_000 # Always keep recent N tokens raw +``` + +## Compression Strategy + +### Tiered Compression + +| Content | Recent | Older | Very Old | +|---------|--------|-------|----------| +| **User messages** | Raw | Deduplicated | Light summary (preserve quotes) | +| **LLM messages** | Raw | Summarized to conclusions | Bullet points only | + +### Asymmetric Ratios + +```python +COMPRESSION_CONFIG = { + "user": { + "recent_turns": 20, # Keep last N user turns raw + "summary_target_ratio": 0.7, # Compress to 70% (gentle) + "preserve_quotes": True, # Keep exact user phrasing + "preserve_code": True, # Never summarize code blocks + }, + "assistant": { + "recent_turns": 10, # Keep last N assistant turns raw + "summary_target_ratio": 0.2, # Compress to 20% (aggressive) + "keep_decisions": True, # Preserve: "I decided to...", "I'll use..." + "keep_conclusions": True, # Preserve final answers + }, +} +``` + +### Compression Triggers + +```python +def should_compress(conversation: LongConversation) -> bool: + """Check if compression is needed.""" + usage_ratio = conversation.current_total_tokens / conversation.target_context_tokens + return usage_ratio >= conversation.compress_threshold + +def select_segments_to_compress(conversation: LongConversation) -> list[Segment]: + """Select oldest non-raw segments, preferring LLM content.""" + candidates = [] + for seg in conversation.segments: + if seg.state != "raw": + continue + # Skip recent segments + if is_recent(seg, conversation): + continue + candidates.append(seg) + + # Sort: LLM messages first (compress those more aggressively) + candidates.sort(key=lambda s: (s.role == "user", s.timestamp)) + return candidates +``` + +## Repetition Detection + +### Code Block Deduplication + +Users frequently paste the same code with minor changes: + +```python +def detect_code_repetition( + new_content: str, + history: list[Segment], +) -> RepetitionResult | None: + """Find near-duplicate code blocks.""" + new_blocks = extract_code_blocks(new_content) + + for block in new_blocks: + block_hash = hash_code_block(block) + + for seg in history: + if seg.role != "user": + continue + + for hist_block in extract_code_blocks(seg.content): + similarity = compute_similarity(block, hist_block) + + if similarity > 0.85: # Near-duplicate + diff = compute_unified_diff(hist_block, block) + diff_size = len(diff) + original_size = len(block) + + if diff_size < original_size * 0.3: + return RepetitionResult( + original_segment_id=seg.id, + diff=diff, + saved_tokens=count_tokens(block) - count_tokens(diff), + ) + + return None +``` + +### Replacement Format + +When repetition is detected, store: + +```markdown +[Code block similar to turn #42, with these changes:] +```diff +@@ -45,3 +45,5 @@ + def process(self): +- return self.data ++ validated = self.validate() ++ return self.transform(validated) +``` +``` + +## Context Building + +### Building the Prompt + +```python +def build_context( + conversation: LongConversation, + new_message: str, + token_budget: int, +) -> list[Message]: + """Build full context for LLM request, enforcing token budget.""" + messages = [] + + # 1. System message with conversation metadata (required) + system_msg = { + "role": "system", + "content": build_system_prompt(conversation), + } + messages.append(system_msg) + + # 2. New user message (required, reserve space) + new_user_msg = {"role": "user", "content": new_message} + reserved_tokens = count_tokens(system_msg) + count_tokens(new_user_msg) + + # 3. Recent raw conversation (high priority) + recent_segments = get_recent_segments(conversation) + recent_messages = [ + {"role": seg.role, "content": seg.content} + for seg in recent_segments + ] + + # 4. Compressed older history (lower priority, can be trimmed) + compressed_history = render_compressed_segments( + conversation.segments, + exclude_recent=True, + ) + history_msg = None + if compressed_history: + history_msg = { + "role": "system", + "content": f"Previous conversation (summarized):\n{compressed_history}", + } + + # 5. Enforce token budget by trimming older content first + available = token_budget - reserved_tokens + recent_tokens = sum(count_tokens(m) for m in recent_messages) + history_tokens = count_tokens(history_msg) if history_msg else 0 + + if recent_tokens + history_tokens > available: + # Drop summarized history first + if history_tokens > 0 and recent_tokens <= available: + history_msg = None + else: + # Trim oldest recent messages until we fit + while recent_messages and recent_tokens > available: + dropped = recent_messages.pop(0) + recent_tokens -= count_tokens(dropped) + + # 6. Assemble final message list + if history_msg: + messages.append(history_msg) + messages.extend(recent_messages) + messages.append(new_user_msg) + + return messages +``` + +## Storage + +### File Structure + +``` +memory_db/ +├── conversations/ +│ └── {conversation_id}/ +│ ├── segments/ +│ │ ├── 0001_user_2024-01-15T10:30:00.md +│ │ ├── 0002_assistant_2024-01-15T10:30:15.md +│ │ └── ... +│ ├── compressed/ +│ │ ├── batch_0001-0050.md # Compressed older segments +│ │ └── ... +│ └── metadata.json +└── index/ # ChromaDB (optional, for hybrid retrieval) +``` + +### Segment File Format + +```markdown +--- +id: seg_abc123 +role: user +timestamp: 2024-01-15T10:30:00Z +original_tokens: 450 +current_tokens: 450 +state: raw +content_hash: sha256:abcd1234... +--- + +Here's the updated module with the fix for the race condition: + +```python +class ConnectionPool: + ... +``` +``` + +## CLI Interface + +### New Flags + +```python +@memory_app.command("proxy") +def memory_proxy( + # ... existing flags ... + + # Long conversation mode + long_conversation: bool = typer.Option( + False, + "--long-conversation/--no-long-conversation", + help="Enable long conversation mode with asymmetric compression.", + rich_help_panel="Conversation Mode", + ), + context_budget: int = typer.Option( + 150_000, + "--context-budget", + help="Target context window size in tokens (long-conversation mode).", + rich_help_panel="Conversation Mode", + ), + compress_threshold: float = typer.Option( + 0.8, + "--compress-threshold", + help="Start compression when context reaches this fraction of budget.", + rich_help_panel="Conversation Mode", + ), + raw_recent_tokens: int = typer.Option( + 40_000, + "--raw-recent-tokens", + help="Always keep this many recent tokens uncompressed.", + rich_help_panel="Conversation Mode", + ), +) -> None: +``` + +### Example Usage + +```bash +# Start with 200k context budget, compress at 80% +agent-cli memory proxy \ + --long-conversation \ + --context-budget 200000 \ + --compress-threshold 0.8 \ + --raw-recent-tokens 50000 + +# Use with Claude (which has 200k context) +agent-cli memory proxy \ + --long-conversation \ + --openai-base-url https://api.anthropic.com/v1 \ + --context-budget 180000 +``` + +## Implementation Phases + +### Phase 1: Basic Long Conversation Storage +- [ ] `Segment` and `LongConversation` data models +- [ ] File-based persistence (segments as markdown files) +- [ ] Basic context building (no compression yet) +- [ ] New `--long-conversation` flag + +### Phase 2: Asymmetric Compression +- [ ] Token counting per segment +- [ ] Compression trigger logic +- [ ] LLM-based summarization for assistant messages +- [ ] Gentler summarization for user messages +- [ ] Preserve code blocks and quotes + +### Phase 3: Repetition Detection +- [ ] Code block extraction and hashing +- [ ] Similarity detection (diff-based) +- [ ] Reference storage format +- [ ] Context reconstruction from references + +### Phase 4: Hybrid Mode (Optional) +- [ ] Extract facts for semantic search (like current mode) +- [ ] Use semantic retrieval to supplement chronological context +- [ ] "What did we discuss about X?" queries + +## Open Questions + +1. **Absorption detection**: How do we know when LLM has "learned" something? + - Track references in later responses? + - Time-based heuristic (after N turns)? + - Explicit user signal? + +2. **Cross-session persistence**: How to handle proxy restarts? + - Load full conversation from disk on startup? + - Lazy loading with LRU cache? + +3. **Multiple conversations**: Support multiple long conversations? + - Use `memory_id` from request to switch? + - Separate storage per conversation? + +4. **Compression model**: Same model as chat, or smaller/faster? + - Latency impact of compression + - Quality vs speed tradeoff + +## Comparison + +| Aspect | Current `memory proxy` | Long Conversation Mode | +|--------|------------------------|------------------------| +| Context building | Semantic retrieval | Chronological + compressed | +| Compression | Symmetric | Asymmetric (user > LLM) | +| Repetition | None | Diff-based dedup | +| Retrieval | Vector similarity | Sequential history | +| Use case | Many short conversations | Single long conversation | +| Token budget | Implicit | Explicit (100-200k) | diff --git a/tests/memory/test_long_conversation.py b/tests/memory/test_long_conversation.py new file mode 100644 index 00000000..9354101c --- /dev/null +++ b/tests/memory/test_long_conversation.py @@ -0,0 +1,789 @@ +"""Integration tests for long conversation mode. + +Tests the full transformation pipeline: +- Messages in → context built → request to LLM +- Compression triggers and asymmetric behavior +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any +from unittest.mock import patch + +import pytest +from fastapi.testclient import TestClient + +from agent_cli.memory import api as memory_api +from agent_cli.memory._long_conversation import ( + append_segment, + build_context, + compute_similarity, + create_segment, + extract_chunks, + load_conversation, +) +from agent_cli.memory.entities import Segment + +if TYPE_CHECKING: + from pathlib import Path + + +@pytest.fixture +def captured_requests() -> list[dict[str, Any]]: + """Collect all requests sent to the LLM.""" + return [] + + +@pytest.fixture +def long_convo_client( + tmp_path: Path, + captured_requests: list[dict[str, Any]], +) -> TestClient: + """Create a long conversation mode client that captures LLM requests.""" + + async def _capture_request( + request: Any, + base_url: str, # noqa: ARG001 + api_key: str | None, # noqa: ARG001 + exclude_fields: set[str], # noqa: ARG001 + ) -> dict[str, Any]: + # Capture the request for inspection + captured_requests.append( + { + "messages": [{"role": m.role, "content": m.content} for m in request.messages], + "model": request.model, + }, + ) + # Return a mock response + return {"choices": [{"message": {"content": "Assistant response."}}]} + + with patch( + "agent_cli.core.openai_proxy.forward_chat_request", + side_effect=_capture_request, + ): + app = memory_api.create_app( + memory_path=tmp_path, + openai_base_url="http://mock-llm", + long_conversation=True, + context_budget=1000, # Small budget to trigger compression easily + compress_threshold=0.5, # Compress at 50% to trigger quickly + raw_recent_tokens=200, # Keep only ~200 tokens raw + ) + yield TestClient(app) + + +def test_first_message_goes_directly_to_llm( + long_convo_client: TestClient, + captured_requests: list[dict[str, Any]], +) -> None: + """First message should be passed to LLM with no history.""" + resp = long_convo_client.post( + "/v1/chat/completions", + json={ + "model": "test-model", + "messages": [{"role": "user", "content": "Hello!"}], + }, + ) + assert resp.status_code == 200 + + # Should have captured one request + assert len(captured_requests) == 1 + req = captured_requests[0] + + # Should contain just the user message + assert len(req["messages"]) == 1 + assert req["messages"][0]["role"] == "user" + assert req["messages"][0]["content"] == "Hello!" + + +def test_history_accumulates_across_requests( + long_convo_client: TestClient, + captured_requests: list[dict[str, Any]], +) -> None: + """Subsequent messages should include conversation history.""" + # First message + long_convo_client.post( + "/v1/chat/completions", + json={ + "model": "test-model", + "messages": [{"role": "user", "content": "Message 1"}], + }, + ) + + # Second message + long_convo_client.post( + "/v1/chat/completions", + json={ + "model": "test-model", + "messages": [{"role": "user", "content": "Message 2"}], + }, + ) + + # Second request should have history + assert len(captured_requests) == 2 + req = captured_requests[1] + + # Should have: user1, assistant1, user2 + assert len(req["messages"]) == 3 + assert req["messages"][0]["content"] == "Message 1" + assert req["messages"][1]["content"] == "Assistant response." + assert req["messages"][2]["content"] == "Message 2" + + +def test_segments_persisted_to_disk( + long_convo_client: TestClient, + tmp_path: Path, +) -> None: + """Messages should be persisted as segment files.""" + long_convo_client.post( + "/v1/chat/completions", + json={ + "model": "test-model", + "messages": [{"role": "user", "content": "Persist me"}], + }, + ) + + segments_dir = tmp_path / "long_conversations" / "default" / "segments" + files = list(segments_dir.glob("*.md")) + + # Should have user + assistant segments + assert len(files) == 2 + + # Check content + contents = [f.read_text() for f in sorted(files)] + assert "Persist me" in contents[0] + assert "Assistant response." in contents[1] + + +def test_compression_triggers_when_threshold_exceeded( + tmp_path: Path, + captured_requests: list[dict[str, Any]], +) -> None: + """Compression should trigger when token threshold is exceeded.""" + summarize_calls: list[str] = [] + + async def _capture_and_summarize( + request: Any, + base_url: str, # noqa: ARG001 + api_key: str | None, # noqa: ARG001 + exclude_fields: set[str], # noqa: ARG001 + ) -> dict[str, Any]: + content = request.messages[0].content + + # Detect if this is a summarization request + if "Summarize the following" in content: + summarize_calls.append(content) + return {"choices": [{"message": {"content": "Summary: brief."}}]} + + # Regular chat - capture and respond + captured_requests.append( + { + "messages": [{"role": m.role, "content": m.content} for m in request.messages], + }, + ) + # Return a long response to help trigger compression + return {"choices": [{"message": {"content": "A" * 500}}]} + + with patch( + "agent_cli.core.openai_proxy.forward_chat_request", + side_effect=_capture_and_summarize, + ): + app = memory_api.create_app( + memory_path=tmp_path, + openai_base_url="http://mock-llm", + long_conversation=True, + context_budget=300, # Very small budget + compress_threshold=0.3, # Compress at 30% + raw_recent_tokens=50, # Keep very few tokens raw + ) + client = TestClient(app) + + # Send several messages to trigger compression + for i in range(5): + client.post( + "/v1/chat/completions", + json={ + "model": "test-model", + "messages": [{"role": "user", "content": f"Message {i}: " + "x" * 100}], + }, + ) + + # Should have triggered summarization + assert len(summarize_calls) > 0, "Compression should have been triggered" + + +def test_assistant_messages_compressed_before_user( + tmp_path: Path, +) -> None: + """Assistant messages should be prioritized for compression.""" + summarize_calls: list[str] = [] + + async def _track_summarization( + request: Any, + base_url: str, # noqa: ARG001 + api_key: str | None, # noqa: ARG001 + exclude_fields: set[str], # noqa: ARG001 + ) -> dict[str, Any]: + content = request.messages[0].content + + if "Summarize the following" in content: + summarize_calls.append(content) + # Check which type of summarization + if "assistant response" in content.lower(): + return {"choices": [{"message": {"content": "• Decision made"}}]} + return {"choices": [{"message": {"content": "User asked about X"}}]} + + return {"choices": [{"message": {"content": "Long assistant response " + "y" * 200}}]} + + with patch( + "agent_cli.core.openai_proxy.forward_chat_request", + side_effect=_track_summarization, + ): + app = memory_api.create_app( + memory_path=tmp_path, + openai_base_url="http://mock-llm", + long_conversation=True, + context_budget=400, + compress_threshold=0.4, + raw_recent_tokens=100, + ) + client = TestClient(app) + + # Send messages to trigger compression + for i in range(6): + client.post( + "/v1/chat/completions", + json={ + "model": "test-model", + "messages": [{"role": "user", "content": f"User message {i}"}], + }, + ) + + # Check that assistant messages were compressed first + # (the prompt mentions "assistant response" for assistant messages) + if summarize_calls: + # First compressions should be assistant messages + first_call = summarize_calls[0] + assert "assistant" in first_call.lower(), ( + "Assistant messages should be compressed before user messages" + ) + + +def test_summarized_content_used_in_context( + tmp_path: Path, + captured_requests: list[dict[str, Any]], +) -> None: + """After compression, summaries should be used in context building.""" + + async def _summarize_handler( + request: Any, + base_url: str, # noqa: ARG001 + api_key: str | None, # noqa: ARG001 + exclude_fields: set[str], # noqa: ARG001 + ) -> dict[str, Any]: + content = request.messages[0].content + + if "Summarize the following" in content: + return {"choices": [{"message": {"content": "[SUMMARIZED]"}}]} + + # Capture for inspection + captured_requests.append( + { + "messages": [{"role": m.role, "content": m.content} for m in request.messages], + }, + ) + return {"choices": [{"message": {"content": "Response " + "z" * 150}}]} + + with patch( + "agent_cli.core.openai_proxy.forward_chat_request", + side_effect=_summarize_handler, + ): + app = memory_api.create_app( + memory_path=tmp_path, + openai_base_url="http://mock-llm", + long_conversation=True, + context_budget=300, + compress_threshold=0.3, + raw_recent_tokens=50, + ) + client = TestClient(app) + + # Send enough messages to trigger compression + for i in range(8): + client.post( + "/v1/chat/completions", + json={ + "model": "test-model", + "messages": [{"role": "user", "content": f"Msg {i}"}], + }, + ) + + # After compression, later requests should contain [SUMMARIZED] content + # (This is a soft check - compression timing can vary) + assert len(captured_requests) >= 5 + + +def test_separate_conversations_isolated( + long_convo_client: TestClient, + captured_requests: list[dict[str, Any]], +) -> None: + """Different memory_ids should have isolated histories.""" + # Message to conversation A + long_convo_client.post( + "/v1/chat/completions", + json={ + "model": "test-model", + "messages": [{"role": "user", "content": "Hello from A"}], + "memory_id": "convo-a", + }, + ) + + # Message to conversation B + long_convo_client.post( + "/v1/chat/completions", + json={ + "model": "test-model", + "messages": [{"role": "user", "content": "Hello from B"}], + "memory_id": "convo-b", + }, + ) + + # Second message to A - should only have A's history + long_convo_client.post( + "/v1/chat/completions", + json={ + "model": "test-model", + "messages": [{"role": "user", "content": "Second from A"}], + "memory_id": "convo-a", + }, + ) + + # Third request should only have convo A history + req = captured_requests[2] + contents = [m["content"] for m in req["messages"]] + + assert "Hello from A" in contents + assert "Hello from B" not in contents + assert "Second from A" in contents + + +# --- Phase 3: Repetition Detection Tests --- + + +def test_chunk_extraction() -> None: + """Test that text chunks are correctly extracted from content.""" + # Create content with substantial chunks separated by double newlines + chunk1 = "x" * 250 # Over 200 char minimum + chunk2 = "y" * 300 + content = f"{chunk1}\n\n{chunk2}" + + chunks = extract_chunks(content) + assert len(chunks) == 2 + assert chunks[0].content == chunk1 + assert chunks[1].content == chunk2 + + +def test_chunk_extraction_filters_small() -> None: + """Test that small chunks are filtered out.""" + small = "small text" # Under 200 chars + large = "z" * 250 + + content = f"{small}\n\n{large}" + chunks = extract_chunks(content) + + # Only the large chunk should be extracted + assert len(chunks) == 1 + assert chunks[0].content == large + + +def test_similarity_detection() -> None: + """Test that similar text chunks are detected.""" + text1 = """def process(self): + return self.data""" + + text2 = """def process(self): + validated = self.validate() + return self.data""" + + similarity = compute_similarity(text1, text2) + # These are similar but not identical + assert 0.6 < similarity < 1.0 + + # Identical text + identical = compute_similarity(text1, text1) + assert identical == 1.0 + + +def test_duplicate_text_creates_reference( + tmp_path: Path, + captured_requests: list[dict[str, Any]], +) -> None: + """When user pastes same text twice, second should be stored as reference.""" + # Large text block (over 200 chars, single paragraph - no blank lines) + large_text = """def calculate_total(items): + total = 0 + for item in items: + total += item.price * item.quantity + # Add tax calculation + tax = total * 0.1 + total += tax + return total +# This function processes the items and calculates the total price""" + + async def _capture_request( + request: Any, + base_url: str, # noqa: ARG001 + api_key: str | None, # noqa: ARG001 + exclude_fields: set[str], # noqa: ARG001 + ) -> dict[str, Any]: + captured_requests.append( + { + "messages": [{"role": m.role, "content": m.content} for m in request.messages], + }, + ) + return {"choices": [{"message": {"content": "Got it!"}}]} + + with patch( + "agent_cli.core.openai_proxy.forward_chat_request", + side_effect=_capture_request, + ): + app = memory_api.create_app( + memory_path=tmp_path, + openai_base_url="http://mock-llm", + long_conversation=True, + ) + client = TestClient(app) + + # First message with text + client.post( + "/v1/chat/completions", + json={ + "model": "test-model", + "messages": [{"role": "user", "content": f"Here's my code:\n\n{large_text}"}], + }, + ) + + # Second message with identical text + client.post( + "/v1/chat/completions", + json={ + "model": "test-model", + "messages": [{"role": "user", "content": f"Same code again:\n\n{large_text}"}], + }, + ) + + # Check segments on disk + segments_dir = tmp_path / "long_conversations" / "default" / "segments" + files = sorted(segments_dir.glob("*.md")) + + # Should have 4 segments: user1, assistant1, user2, assistant2 + assert len(files) == 4 + + # Read the second user segment (user2) + second_user_content = files[2].read_text() + + # Should contain a reference marker instead of full text + assert "Similar to segment" in second_user_content + + +def test_different_text_not_deduplicated( + tmp_path: Path, + captured_requests: list[dict[str, Any]], +) -> None: + """Completely different text should not be deduplicated.""" + # Two different large text blocks + text1 = """This is a completely different piece of text +that talks about something entirely unrelated to the second one. +It has enough characters to be considered a chunk for deduplication. +We need to make sure it's over 200 characters long to be extracted.""" + + text2 = """Another block of text that has no similarity to the first. +It discusses different topics and uses different words entirely. +The deduplication system should not find any match between these two. +This should remain as full content without any reference markers.""" + + async def _capture_request( + request: Any, + base_url: str, # noqa: ARG001 + api_key: str | None, # noqa: ARG001 + exclude_fields: set[str], # noqa: ARG001 + ) -> dict[str, Any]: + captured_requests.append( + { + "messages": [{"role": m.role, "content": m.content} for m in request.messages], + }, + ) + return {"choices": [{"message": {"content": "Got it!"}}]} + + with patch( + "agent_cli.core.openai_proxy.forward_chat_request", + side_effect=_capture_request, + ): + app = memory_api.create_app( + memory_path=tmp_path, + openai_base_url="http://mock-llm", + long_conversation=True, + ) + client = TestClient(app) + + # First message + client.post( + "/v1/chat/completions", + json={ + "model": "test-model", + "messages": [{"role": "user", "content": text1}], + }, + ) + + # Second message with completely different text + client.post( + "/v1/chat/completions", + json={ + "model": "test-model", + "messages": [{"role": "user", "content": text2}], + }, + ) + + # Check segments on disk + segments_dir = tmp_path / "long_conversations" / "default" / "segments" + files = sorted(segments_dir.glob("*.md")) + + # Read the second user segment + second_user_content = files[2].read_text() + + # Should NOT contain a reference marker - text is different + assert "Similar to segment" not in second_user_content + # Should contain the full text + assert "no similarity" in second_user_content + + +def test_reference_segment_used_in_context( + tmp_path: Path, + captured_requests: list[dict[str, Any]], +) -> None: + """Reference segments should be included in context with compact format.""" + # Large text block that will be deduplicated (over 200 chars, single paragraph) + large_text = """def very_long_function(): + # This is a long function with lots of lines that would take many tokens + # We want to deduplicate this content to save context space in conversations + result = process_something_important() + validated = validate_the_result(result) + transformed = transform_and_return(validated) + return transformed""" + + async def _capture_request( + request: Any, + base_url: str, # noqa: ARG001 + api_key: str | None, # noqa: ARG001 + exclude_fields: set[str], # noqa: ARG001 + ) -> dict[str, Any]: + captured_requests.append( + { + "messages": [{"role": m.role, "content": m.content} for m in request.messages], + }, + ) + return {"choices": [{"message": {"content": "Processed!"}}]} + + with patch( + "agent_cli.core.openai_proxy.forward_chat_request", + side_effect=_capture_request, + ): + app = memory_api.create_app( + memory_path=tmp_path, + openai_base_url="http://mock-llm", + long_conversation=True, + ) + client = TestClient(app) + + # First message with text + client.post( + "/v1/chat/completions", + json={ + "model": "test-model", + "messages": [{"role": "user", "content": f"First:\n\n{large_text}"}], + }, + ) + + # Second message with same text + client.post( + "/v1/chat/completions", + json={ + "model": "test-model", + "messages": [{"role": "user", "content": f"Again:\n\n{large_text}"}], + }, + ) + + # Third message - should see history with reference + client.post( + "/v1/chat/completions", + json={ + "model": "test-model", + "messages": [{"role": "user", "content": "What about my code?"}], + }, + ) + + # Check the third request's context + assert len(captured_requests) >= 3 + third_request = captured_requests[2] + + # The context should include the reference segment + all_content = " ".join(m["content"] for m in third_request["messages"]) + + # First mention should have full text + assert "very_long_function" in all_content + + +# --- Integration: Combined Features --- + + +def test_compression_and_deduplication_together( + tmp_path: Path, +) -> None: + """Test that compression and deduplication work together correctly. + + This tests the integration of: + - Phase 2: Compression (summarization of old segments) + - Phase 3: Deduplication (reference segments for repeated content) + """ + summarize_calls: list[str] = [] + + # Large text that will be repeated (over 200 chars, single paragraph) + repeated_code = """def process_data(items): + results = [] + for item in items: + validated = validate_item(item) + transformed = transform_item(validated) + results.append(transformed) + return results # Returns the processed list of items after validation and transformation""" + + async def _handle_request( + request: Any, + base_url: str, # noqa: ARG001 + api_key: str | None, # noqa: ARG001 + exclude_fields: set[str], # noqa: ARG001 + ) -> dict[str, Any]: + content = request.messages[0].content + + # Track summarization requests + if "Summarize the following" in content: + summarize_calls.append(content) + return {"choices": [{"message": {"content": "• Processed data items"}}]} + + # Return long responses to trigger compression + return {"choices": [{"message": {"content": "Response: " + "z" * 200}}]} + + with patch( + "agent_cli.core.openai_proxy.forward_chat_request", + side_effect=_handle_request, + ): + app = memory_api.create_app( + memory_path=tmp_path, + openai_base_url="http://mock-llm", + long_conversation=True, + context_budget=500, # Small budget to trigger compression + compress_threshold=0.4, # Compress at 40% + raw_recent_tokens=100, # Keep very few tokens raw + ) + client = TestClient(app) + + # 1. First message with code + client.post( + "/v1/chat/completions", + json={ + "model": "test-model", + "messages": [{"role": "user", "content": f"First code:\n\n{repeated_code}"}], + }, + ) + + # 2. Send several messages to trigger compression + for i in range(4): + client.post( + "/v1/chat/completions", + json={ + "model": "test-model", + "messages": [{"role": "user", "content": f"Question {i}"}], + }, + ) + + # 3. Send same code again - should be deduplicated + client.post( + "/v1/chat/completions", + json={ + "model": "test-model", + "messages": [{"role": "user", "content": f"Same code:\n\n{repeated_code}"}], + }, + ) + + # Verify both features worked + segments_dir = tmp_path / "long_conversations" / "default" / "segments" + files = sorted(segments_dir.glob("*.md")) + + # Should have multiple segments (12 = 6 user + 6 assistant) + assert len(files) >= 10 + + # Check for deduplication in the last user segment + last_user_segments = [f for f in files if "_user_" in f.name] + last_user_content = last_user_segments[-1].read_text() + assert "Similar to segment" in last_user_content, "Deduplication should have occurred" + + # Check for compression (summarization should have been triggered) + assert len(summarize_calls) > 0, "Compression should have been triggered" + + +def test_build_context_with_all_segment_states( + tmp_path: Path, +) -> None: + """Test build_context handles raw, summarized, and reference segments correctly.""" + # Create a conversation with all three segment states + convo = load_conversation(tmp_path, "test-context") + + # 1. Raw segment + raw_seg = create_segment("user", "This is raw content that has not been compressed.") + convo = append_segment(tmp_path, convo, raw_seg) + + # 2. Summarized segment (manually set state) + summarized_seg = Segment( + id="summarized-1", + role="assistant", + content="Original long response with lots of detail...", + timestamp=raw_seg.timestamp, + original_tokens=100, + current_tokens=20, + state="summarized", + summary="• Key point from response", + content_hash="abc123", + ) + convo = append_segment(tmp_path, convo, summarized_seg) + + # 3. Reference segment (manually set state) + reference_seg = Segment( + id="reference-1", + role="user", + content="[Similar to segment raw-1]\n\n[Changes:\n+small diff]", + timestamp=raw_seg.timestamp, + original_tokens=200, + current_tokens=30, + state="reference", + refers_to=raw_seg.id, + diff="+small diff", + content_hash="def456", + ) + convo = append_segment(tmp_path, convo, reference_seg) + + # Build context + context = build_context(convo, "New question", token_budget=10000) + + # Verify all segments are included correctly + assert len(context) == 4 # 3 history + 1 new message + + # Raw segment should use full content + assert context[0]["content"] == "This is raw content that has not been compressed." + + # Summarized segment should use summary + assert context[1]["content"] == "• Key point from response" + + # Reference segment should use stored content (with reference marker) + assert "Similar to segment" in context[2]["content"] + + # New message at the end + assert context[3]["content"] == "New question"