Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 66 additions & 2 deletions sentience/agent_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,13 +312,17 @@ async def get_url(self) -> str:
self._cached_url = url
return url

async def snapshot(self, **kwargs: Any) -> Snapshot:
async def snapshot(self, emit_trace: bool = True, **kwargs: Any) -> Snapshot:
"""
Take a snapshot of the current page state.

This updates last_snapshot which is used as context for assertions.
When emit_trace=True (default), automatically emits a 'snapshot' trace event
with screenshot_base64 for Sentience Studio visualization.

Args:
emit_trace: If True (default), emit a 'snapshot' trace event with screenshot.
Set to False to disable automatic trace emission.
**kwargs: Override default snapshot options for this call.
Common options:
- limit: Maximum elements to return
Expand All @@ -328,6 +332,15 @@ async def snapshot(self, **kwargs: Any) -> Snapshot:

Returns:
Snapshot of current page state

Example:
>>> # Default: snapshot with auto-emit trace event
>>> snapshot = await runtime.snapshot()

>>> # Disable auto-emit for manual control
>>> snapshot = await runtime.snapshot(emit_trace=False)
>>> # Later, manually emit if needed:
>>> tracer.emit_snapshot(snapshot, step_id=runtime.step_id)
"""
# Check if using legacy browser (backward compat)
if hasattr(self, "_legacy_browser") and hasattr(self, "_legacy_page"):
Expand All @@ -337,6 +350,9 @@ async def snapshot(self, **kwargs: Any) -> Snapshot:
if self._step_pre_snapshot is None:
self._step_pre_snapshot = self.last_snapshot
self._step_pre_url = self.last_snapshot.url
# Auto-emit trace for legacy path too
if emit_trace and self.last_snapshot is not None:
self._emit_snapshot_trace(self.last_snapshot)
return self.last_snapshot

# Use backend-agnostic snapshot
Expand All @@ -356,8 +372,33 @@ async def snapshot(self, **kwargs: Any) -> Snapshot:
self._step_pre_url = self.last_snapshot.url
if not skip_captcha_handling:
await self._handle_captcha_if_needed(self.last_snapshot, source="gateway")

# Auto-emit snapshot trace event for Studio visualization
if emit_trace and self.last_snapshot is not None:
self._emit_snapshot_trace(self.last_snapshot)

return self.last_snapshot

def _emit_snapshot_trace(self, snapshot: Snapshot) -> None:
"""
Emit a snapshot trace event with screenshot for Studio visualization.

This is called automatically by snapshot() when emit_trace=True.
"""
if self.tracer is None:
return

try:
self.tracer.emit_snapshot(
snapshot=snapshot,
step_id=self.step_id,
step_index=self.step_index,
screenshot_format="jpeg",
)
except Exception:
# Best-effort: don't let trace emission errors break snapshot
pass

async def sampled_snapshot(
self,
*,
Expand Down Expand Up @@ -903,18 +944,27 @@ def _artifact_metadata(self) -> dict[str, Any]:
"url": url,
}

def begin_step(self, goal: str, step_index: int | None = None) -> str:
def begin_step(
self,
goal: str,
step_index: int | None = None,
emit_trace: bool = True,
pre_url: str | None = None,
) -> str:
"""
Begin a new step in the verification loop.

This:
- Generates a new step_id
- Clears assertions from previous step
- Increments step_index (or uses provided value)
- Emits step_start trace event (optional)

Args:
goal: Description of what this step aims to achieve
step_index: Optional explicit step index (otherwise auto-increments)
emit_trace: If True (default), emit step_start trace event for Studio timeline
pre_url: Optional URL to record in step_start event (otherwise uses cached URL)

Returns:
Generated step_id in format 'step-N' where N is the step index
Expand All @@ -939,6 +989,20 @@ def begin_step(self, goal: str, step_index: int | None = None) -> str:
# Generate step_id in 'step-N' format for Studio compatibility
self.step_id = f"step-{self.step_index}"

# Emit step_start trace event for Studio timeline display
if emit_trace and self.tracer:
try:
url = pre_url or self._cached_url or ""
self.tracer.emit_step_start(
step_id=self.step_id,
step_index=self.step_index,
goal=goal,
attempt=0,
pre_url=url,
)
except Exception:
pass # Tracing must be non-fatal

return self.step_id

def assert_(
Expand Down
49 changes: 47 additions & 2 deletions sentience/tracer_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,32 @@
from sentience.tracing import JsonlTraceSink, Tracer


def _emit_run_start(
tracer: Tracer,
agent_type: str | None,
llm_model: str | None,
goal: str | None,
start_url: str | None,
) -> None:
"""
Helper to emit run_start event with available metadata.
"""
try:
config: dict[str, Any] = {}
if goal:
config["goal"] = goal
if start_url:
config["start_url"] = start_url

tracer.emit_run_start(
agent=agent_type or "SentienceAgent",
llm_model=llm_model,
config=config if config else None,
)
except Exception:
pass # Tracing must be non-fatal


def create_tracer(
api_key: str | None = None,
run_id: str | None = None,
Expand All @@ -29,6 +55,7 @@ def create_tracer(
llm_model: str | None = None,
start_url: str | None = None,
screenshot_processor: Callable[[str], str] | None = None,
auto_emit_run_start: bool = True,
) -> Tracer:
"""
Create tracer with automatic tier detection.
Expand Down Expand Up @@ -56,6 +83,9 @@ def create_tracer(
screenshot_processor: Optional function to process screenshots before upload.
Takes base64 string, returns processed base64 string.
Useful for PII redaction or custom image processing.
auto_emit_run_start: If True (default), automatically emit run_start event
with the provided metadata. This ensures traces have
complete structure for Studio visualization.

Returns:
Tracer configured with appropriate sink
Expand All @@ -71,6 +101,7 @@ def create_tracer(
... start_url="https://amazon.com"
... )
>>> # Returns: Tracer with CloudTraceSink
>>> # run_start event is automatically emitted
>>>
>>> # With screenshot processor for PII redaction
>>> def redact_pii(screenshot_base64: str) -> str:
Expand All @@ -87,6 +118,10 @@ def create_tracer(
>>> tracer = create_tracer(run_id="demo")
>>> # Returns: Tracer with JsonlTraceSink (local-only)
>>>
>>> # Disable auto-emit for manual control
>>> tracer = create_tracer(run_id="demo", auto_emit_run_start=False)
>>> tracer.emit_run_start("MyAgent", "gpt-4o") # Manual emit
>>>
>>> # Use with agent
>>> agent = SentienceAgent(browser, llm, tracer=tracer)
>>> agent.act("Click search")
Expand Down Expand Up @@ -136,7 +171,7 @@ def create_tracer(

if upload_url:
print("☁️ [Sentience] Cloud tracing enabled (Pro tier)")
return Tracer(
tracer = Tracer(
run_id=run_id,
sink=CloudTraceSink(
upload_url=upload_url,
Expand All @@ -147,6 +182,10 @@ def create_tracer(
),
screenshot_processor=screenshot_processor,
)
# Auto-emit run_start for complete trace structure
if auto_emit_run_start:
_emit_run_start(tracer, agent_type, llm_model, goal, start_url)
return tracer
else:
print("⚠️ [Sentience] Cloud init response missing upload_url")
print(f" Response data: {data}")
Expand Down Expand Up @@ -204,12 +243,18 @@ def create_tracer(
local_path = traces_dir / f"{run_id}.jsonl"
print(f"💾 [Sentience] Local tracing: {local_path}")

return Tracer(
tracer = Tracer(
run_id=run_id,
sink=JsonlTraceSink(str(local_path)),
screenshot_processor=screenshot_processor,
)

# Auto-emit run_start for complete trace structure
if auto_emit_run_start:
_emit_run_start(tracer, agent_type, llm_model, goal, start_url)

return tracer


def _recover_orphaned_traces(api_key: str, api_url: str = SENTIENCE_API_URL) -> None:
"""
Expand Down
62 changes: 62 additions & 0 deletions sentience/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,68 @@ def emit_error(
}
self.emit("error", data, step_id=step_id)

def emit_snapshot(
self,
snapshot: Any,
step_id: str | None = None,
step_index: int | None = None,
screenshot_format: str = "jpeg",
) -> None:
"""
Emit snapshot event with screenshot for Studio visualization.

This method builds and emits a 'snapshot' trace event that includes:
- Page URL and element data
- Screenshot (if present in snapshot)
- Step correlation info

Use this when you want screenshots to appear in the Sentience Studio timeline.

Args:
snapshot: Snapshot object (must have 'screenshot' attribute for images)
step_id: Step UUID (for correlating snapshot with a step)
step_index: Step index (0-based) for Studio timeline ordering
screenshot_format: Format of screenshot ("jpeg" or "png", default: "jpeg")

Example:
>>> # After taking a snapshot with AgentRuntime
>>> snapshot = await runtime.snapshot(screenshot=True)
>>> tracer.emit_snapshot(snapshot, step_id=runtime.step_id, step_index=runtime.step_index)

>>> # Or use auto-emit (default in AgentRuntime.snapshot())
>>> snapshot = await runtime.snapshot() # Auto-emits snapshot event
"""
if snapshot is None:
return

try:
# Import TraceEventBuilder here to avoid circular imports
from .trace_event_builder import TraceEventBuilder

# Build the snapshot event data
data = TraceEventBuilder.build_snapshot_event(snapshot, step_index=step_index)

# Extract and add screenshot if present
screenshot_raw = getattr(snapshot, "screenshot", None)
if screenshot_raw:
# Extract base64 string from data URL if needed
# Format: "data:image/jpeg;base64,{base64_string}"
if isinstance(screenshot_raw, str) and screenshot_raw.startswith("data:image"):
screenshot_base64 = (
screenshot_raw.split(",", 1)[1]
if "," in screenshot_raw
else screenshot_raw
)
else:
screenshot_base64 = screenshot_raw
data["screenshot_base64"] = screenshot_base64
data["screenshot_format"] = screenshot_format

self.emit("snapshot", data=data, step_id=step_id)
except Exception:
# Best-effort: don't let trace emission errors break the caller
pass

def set_final_status(self, status: str) -> None:
"""
Set the final status of the trace run.
Expand Down
44 changes: 44 additions & 0 deletions tests/test_agent_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ class MockTracer:

def __init__(self) -> None:
self.events: list[dict] = []
self.emit_step_start_called: bool = False
self.emit_step_start_args: dict = {}

def emit(self, event_type: str, data: dict, step_id: str | None = None) -> None:
self.events.append(
Expand All @@ -85,6 +87,23 @@ def emit(self, event_type: str, data: dict, step_id: str | None = None) -> None:
}
)

def emit_step_start(
self,
step_id: str,
step_index: int,
goal: str,
attempt: int = 0,
pre_url: str | None = None,
) -> None:
self.emit_step_start_called = True
self.emit_step_start_args = {
"step_id": step_id,
"step_index": step_index,
"goal": goal,
"attempt": attempt,
"pre_url": pre_url,
}


class TestAgentRuntimeInit:
"""Tests for AgentRuntime initialization."""
Expand Down Expand Up @@ -277,6 +296,31 @@ def test_begin_step_clears_assertions(self) -> None:

assert runtime._assertions_this_step == []

def test_begin_step_emits_step_start_event(self) -> None:
"""Test begin_step emits step_start trace event by default."""
backend = MockBackend()
tracer = MockTracer()
runtime = AgentRuntime(backend=backend, tracer=tracer)

runtime.begin_step(goal="Test step", step_index=1)

# Check that emit_step_start was called
assert tracer.emit_step_start_called is True
assert tracer.emit_step_start_args["step_id"] == "step-1"
assert tracer.emit_step_start_args["step_index"] == 1
assert tracer.emit_step_start_args["goal"] == "Test step"

def test_begin_step_emit_trace_false(self) -> None:
"""Test begin_step with emit_trace=False skips trace event."""
backend = MockBackend()
tracer = MockTracer()
runtime = AgentRuntime(backend=backend, tracer=tracer)

runtime.begin_step(goal="Test step", step_index=1, emit_trace=False)

# Check that emit_step_start was NOT called
assert tracer.emit_step_start_called is False


class TestAgentRuntimeAssertions:
"""Tests for assertion methods."""
Expand Down
Loading
Loading