Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 78 additions & 54 deletions optillm/cepo/cepo.py

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion optillm/cepo/configs/cepo_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ planning_max_tokens_step3: 4096
planning_max_tokens_step4: 4096
use_plan_diversity: False
rating_model: null
use_reasoning: True
use_reasoning_fallback: False
num_of_retries: 0
print_output: False
print_output: False
3 changes: 2 additions & 1 deletion optillm/cepo/configs/cepo_config_gptoss.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ planning_max_tokens_step3: 40960
planning_max_tokens_step4: 40960
use_plan_diversity: False
rating_model: null
use_reasoning: True
use_reasoning_fallback: True
num_of_retries: 2
print_output: true
print_output: true
3 changes: 2 additions & 1 deletion optillm/cepo/configs/cepo_config_qwen3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ planning_max_tokens_step3: 20481
planning_max_tokens_step4: 20482
use_plan_diversity: False
rating_model: null
use_reasoning: True
use_reasoning_fallback: False
num_of_retries: 0
print_output: False
print_output: False
105 changes: 53 additions & 52 deletions optillm/conversation_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field
import time
import copy

logger = logging.getLogger(__name__)

Expand All @@ -30,163 +31,163 @@ class ConversationEntry:
class ConversationLogger:
"""
Logger for OptiLLM conversations including all provider interactions and metadata.

Logs are saved in JSONL format (one JSON object per line) with daily rotation.
Each entry contains the full conversation including all intermediate provider calls.
"""

def __init__(self, log_dir: Path, enabled: bool = False):
self.enabled = enabled
self.log_dir = log_dir
self.active_entries: Dict[str, ConversationEntry] = {}
self._lock = threading.Lock()

if self.enabled:
self.log_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Conversation logging enabled. Logs will be saved to: {self.log_dir}")
else:
logger.debug("Conversation logging disabled")

def _get_log_file_path(self, timestamp: datetime = None) -> Path:
"""Get the log file path for a given timestamp (defaults to now)"""
if timestamp is None:
timestamp = datetime.now(timezone.utc)
date_str = timestamp.strftime("%Y-%m-%d")
return self.log_dir / f"conversations_{date_str}.jsonl"

def _generate_request_id(self) -> str:
"""Generate a unique request ID"""
return f"req_{uuid.uuid4().hex[:8]}"
def start_conversation(self,
client_request: Dict[str, Any],
approach: str,

def start_conversation(self,
client_request: Dict[str, Any],
approach: str,
model: str) -> str:
"""
Start logging a new conversation.

Args:
client_request: The original request from the client
approach: The optimization approach being used
model: The model name

Returns:
str: Unique request ID for this conversation
"""
if not self.enabled:
return ""

request_id = self._generate_request_id()
timestamp = datetime.now(timezone.utc).isoformat()

entry = ConversationEntry(
request_id=request_id,
timestamp=timestamp,
approach=approach,
model=model,
client_request=client_request.copy()
)

with self._lock:
self.active_entries[request_id] = entry

logger.debug(f"Started conversation logging for request {request_id}")
return request_id
def log_provider_call(self,
request_id: str,
provider_request: Dict[str, Any],

def log_provider_call(self,
request_id: str,
provider_request: Dict[str, Any],
provider_response: Dict[str, Any]) -> None:
"""
Log a provider API call and response.

Args:
request_id: The request ID for this conversation
provider_request: The request sent to the provider
provider_response: The response received from the provider
"""
if not self.enabled or not request_id:
return

with self._lock:
entry = self.active_entries.get(request_id)
if not entry:
logger.warning(f"No active conversation found for request {request_id}")
return

call_data = {
"call_number": len(entry.provider_calls) + 1,
"timestamp": datetime.now(timezone.utc).isoformat(),
"request": provider_request.copy(),
"response": provider_response.copy()
"request": provider_request and provider_request.copy() or None,
"response": provider_response and copy.copy(provider_response) or None # Responses are usually strs or dicts
}

entry.provider_calls.append(call_data)

logger.debug(f"Logged provider call #{len(entry.provider_calls)} for request {request_id}")
def log_final_response(self,
request_id: str,

def log_final_response(self,
request_id: str,
final_response: Dict[str, Any]) -> None:
"""
Log the final response sent back to the client.

Args:
request_id: The request ID for this conversation
final_response: The final response sent to the client
"""
if not self.enabled or not request_id:
return

with self._lock:
entry = self.active_entries.get(request_id)
if not entry:
logger.warning(f"No active conversation found for request {request_id}")
return

entry.final_response = final_response.copy()
entry.final_response["timestamp"] = datetime.now(timezone.utc).isoformat()

def log_error(self, request_id: str, error: str) -> None:
"""
Log an error for this conversation.

Args:
request_id: The request ID for this conversation
request_id: The request ID for this conversation
error: Error message or description
"""
if not self.enabled or not request_id:
return

with self._lock:
entry = self.active_entries.get(request_id)
if not entry:
logger.warning(f"No active conversation found for request {request_id}")
return

entry.error = error

logger.debug(f"Logged error for request {request_id}: {error}")

def finalize_conversation(self, request_id: str) -> None:
"""
Finalize and save the conversation to disk.

Args:
request_id: The request ID for this conversation
"""
if not self.enabled or not request_id:
return

with self._lock:
entry = self.active_entries.pop(request_id, None)
if not entry:
logger.warning(f"No active conversation found for request {request_id}")
return

# Calculate total duration
entry.total_duration_ms = int((time.time() - entry.start_time) * 1000)

# Convert to dict for JSON serialization
log_entry = {
"timestamp": entry.timestamp,
Expand All @@ -199,12 +200,12 @@ def finalize_conversation(self, request_id: str) -> None:
"total_duration_ms": entry.total_duration_ms,
"error": entry.error
}

# Write to log file
self._write_log_entry(log_entry)

logger.debug(f"Finalized conversation for request {request_id}")

def _write_log_entry(self, log_entry: Dict[str, Any]) -> None:
"""Write a log entry to the appropriate JSONL file"""
try:
Expand All @@ -215,18 +216,18 @@ def _write_log_entry(self, log_entry: Dict[str, Any]) -> None:
logger.debug(f"Wrote log entry to {log_file_path}")
except Exception as e:
logger.error(f"Failed to write log entry: {e}")

def get_stats(self) -> Dict[str, Any]:
"""Get statistics about conversation logging"""
with self._lock:
active_count = len(self.active_entries)

stats = {
"enabled": self.enabled,
"log_dir": str(self.log_dir),
"active_conversations": active_count
}

if self.enabled:
# Count total log files and approximate total entries
log_files = list(self.log_dir.glob("conversations_*.jsonl"))
Expand All @@ -237,12 +238,12 @@ def get_stats(self) -> Dict[str, Any]:
total_entries += sum(1 for line in f if line.strip())
except Exception:
pass

stats.update({
"log_files_count": len(log_files),
"total_entries_approximate": total_entries
})

return stats


Expand All @@ -262,4 +263,4 @@ def log_provider_call(request_id: str, provider_request: Dict[str, Any], provide
def log_error(request_id: str, error_message: str) -> None:
"""Log an error using the global logger instance"""
if _global_logger and _global_logger.enabled:
_global_logger.log_error(request_id, error_message)
_global_logger.log_error(request_id, error_message)
Loading
Loading